1
0
mirror of https://github.com/chylex/Apache-Prometheus-Exporter.git synced 2024-10-18 15:42:50 +02:00

Compare commits

..

No commits in common. "f0e1447ae51ab12c6148dc736cc476c002e3f32a" and "7def8921b05b8be1c9c874dd54f1e591a15d9222" have entirely different histories.

6 changed files with 215 additions and 363 deletions

181
Cargo.lock generated
View File

@ -22,7 +22,7 @@ name = "apache_prometheus_exporter"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"hyper", "hyper",
"notify", "linemux",
"path-slash", "path-slash",
"prometheus-client", "prometheus-client",
"tokio", "tokio",
@ -55,12 +55,6 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.2.1" version = "1.2.1"
@ -79,6 +73,26 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]] [[package]]
name = "dtoa" name = "dtoa"
version = "1.0.3" version = "1.0.3"
@ -87,14 +101,14 @@ checksum = "c6053ff46b5639ceb91756a85a4c8914668393a03170efd79c8884a529d80656"
[[package]] [[package]]
name = "filetime" name = "filetime"
version = "0.2.22" version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0" checksum = "e94a7bbaa59354bc20dd75b67f23e2797b4490e9d6928203fb105c79e448c86c"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"redox_syscall 0.3.5", "redox_syscall",
"windows-sys 0.48.0", "windows-sys 0.36.1",
] ]
[[package]] [[package]]
@ -134,6 +148,7 @@ dependencies = [
"futures-task", "futures-task",
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"slab",
] ]
[[package]] [[package]]
@ -205,7 +220,7 @@ version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags",
"inotify-sys", "inotify-sys",
"libc", "libc",
] ]
@ -227,9 +242,9 @@ checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754"
[[package]] [[package]]
name = "kqueue" name = "kqueue"
version = "1.0.8" version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
dependencies = [ dependencies = [
"kqueue-sys", "kqueue-sys",
"libc", "libc",
@ -237,11 +252,11 @@ dependencies = [
[[package]] [[package]]
name = "kqueue-sys" name = "kqueue-sys"
version = "1.0.4" version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags",
"libc", "libc",
] ]
@ -251,6 +266,18 @@ version = "0.2.148"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
[[package]]
name = "linemux"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feb035c7806bd7982a317d8d66815021e91f7ab14a5fbedee22b06f608f11b43"
dependencies = [
"futures-util",
"notify",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.8" version = "0.4.8"
@ -263,9 +290,12 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.20" version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
@ -296,19 +326,19 @@ dependencies = [
[[package]] [[package]]
name = "notify" name = "notify"
version = "6.1.1" version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486"
dependencies = [ dependencies = [
"bitflags 2.4.0", "bitflags",
"crossbeam-channel",
"filetime", "filetime",
"inotify", "inotify",
"kqueue", "kqueue",
"libc", "libc",
"log",
"mio", "mio",
"walkdir", "walkdir",
"windows-sys 0.48.0", "windows-sys 0.45.0",
] ]
[[package]] [[package]]
@ -344,7 +374,7 @@ checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"redox_syscall 0.2.16", "redox_syscall",
"smallvec", "smallvec",
"windows-sys 0.36.1", "windows-sys 0.36.1",
] ]
@ -414,16 +444,7 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
dependencies = [
"bitflags 1.3.2",
] ]
[[package]] [[package]]
@ -456,6 +477,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "slab"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.9.0" version = "1.9.0"
@ -561,11 +591,12 @@ checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"
[[package]] [[package]]
name = "walkdir" name = "walkdir"
version = "2.4.0" version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [ dependencies = [
"same-file", "same-file",
"winapi",
"winapi-util", "winapi-util",
] ]
@ -602,9 +633,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]] [[package]]
name = "winapi-util" name = "winapi-util"
version = "0.1.6" version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [ dependencies = [
"winapi", "winapi",
] ]
@ -628,13 +659,37 @@ dependencies = [
"windows_x86_64_msvc 0.36.1", "windows_x86_64_msvc 0.36.1",
] ]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [ dependencies = [
"windows-targets", "windows-targets 0.48.5",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
] ]
[[package]] [[package]]
@ -643,15 +698,21 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
dependencies = [ dependencies = [
"windows_aarch64_gnullvm", "windows_aarch64_gnullvm 0.48.5",
"windows_aarch64_msvc 0.48.5", "windows_aarch64_msvc 0.48.5",
"windows_i686_gnu 0.48.5", "windows_i686_gnu 0.48.5",
"windows_i686_msvc 0.48.5", "windows_i686_msvc 0.48.5",
"windows_x86_64_gnu 0.48.5", "windows_x86_64_gnu 0.48.5",
"windows_x86_64_gnullvm", "windows_x86_64_gnullvm 0.48.5",
"windows_x86_64_msvc 0.48.5", "windows_x86_64_msvc 0.48.5",
] ]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]] [[package]]
name = "windows_aarch64_gnullvm" name = "windows_aarch64_gnullvm"
version = "0.48.5" version = "0.48.5"
@ -664,6 +725,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.48.5" version = "0.48.5"
@ -676,6 +743,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.48.5" version = "0.48.5"
@ -688,6 +761,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.48.5" version = "0.48.5"
@ -700,12 +779,24 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.48.5" version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]] [[package]]
name = "windows_x86_64_gnullvm" name = "windows_x86_64_gnullvm"
version = "0.48.5" version = "0.48.5"
@ -718,6 +809,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.48.5" version = "0.48.5"

View File

@ -14,7 +14,7 @@ codegen-units = 1
[dependencies] [dependencies]
hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] } hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] }
notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] } linemux = "0.3.0"
path-slash = "0.2.1" path-slash = "0.2.1"
prometheus-client = "0.21.2" prometheus-client = "0.21.2"
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "rt", "signal"] } tokio = { version = "1.32.0", features = ["rt", "macros", "signal"] }

View File

@ -82,21 +82,17 @@ The wildcard must not include any prefix or suffix, so `/*/` is accepted, but `/
#### Notes #### Notes
> The exporter only searches for files when it starts. If you need the exporter to watch a new file or forget a deleted file, you must restart it. > At least one access log file and one error log file must be found when the exporter starts, otherwise the exporter immediately exits with an error.
> If a log file is deleted, the exporter will automatically resume watching it if it is re-created later. If you want the exporter to forget about deleted log files, restart the exporter.
## 4. Launch the Exporter ## 4. Launch the Exporter
Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL. Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL.
If no errors are shown, the exporter will begin reading the found log files from the end, and printing each line to the standard output. When a log file is rotated, the exporter will begin reading it from the beginning. Press `Ctrl-C` to stop the exporter.
Press `Ctrl-C` to stop the exporter. Signals other than `SIGINT` are ignored. **Important:** Due to library bugs, the exporter will currently not watch rotated log files. If you want to use this project right now, you will need to add the `-c` flag to `rotatelogs`, and restart the exporter after every rotation.
#### Notes
> The exporter is designed to work and tested with the `rotatelogs` tool in a Linux container. Any other tools or operating systems are unsupported.
> If an error occurs while reading a file or re-opening a rotated file, the exporter will stop watching it and print the error to standard output.
## 5. Collect Prometheus Metrics ## 5. Collect Prometheus Metrics

View File

@ -1,62 +0,0 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use notify::{ErrorKind, Event, recommended_watcher, RecommendedWatcher, RecursiveMode, Result, Watcher};
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
pub struct FsWatcher {
watcher: Mutex<RecommendedWatcher>,
}
impl FsWatcher {
pub fn new(callbacks: FsEventCallbacks) -> Result<Self> {
let watcher = recommended_watcher(move |event| callbacks.handle_event(event))?;
let watcher = Mutex::new(watcher);
Ok(Self { watcher })
}
pub async fn watch(&self, path: &Path) -> Result<()> {
let mut watcher = self.watcher.lock().await;
if let Err(e) = watcher.unwatch(path) {
if !matches!(e.kind, ErrorKind::WatchNotFound) {
return Err(e);
}
}
watcher.watch(path, RecursiveMode::NonRecursive)
}
}
pub struct FsEventCallbacks {
senders: HashMap<PathBuf, Sender<Event>>,
}
impl FsEventCallbacks {
pub fn new() -> Self {
Self { senders: HashMap::new() }
}
pub fn register(&mut self, path: &Path, sender: Sender<Event>) {
self.senders.insert(path.to_path_buf(), sender);
}
fn handle_event(&self, event: Result<Event>) {
match event {
Ok(event) => {
for path in &event.paths {
if let Some(sender) = self.senders.get(path) {
if let Err(e) = sender.try_send(event.clone()) {
println!("[FsWatcher] Error sending filesystem event for path \"{}\": {}", path.to_string_lossy(), e);
}
}
}
}
Err(e) => {
println!("[FsWatcher] Error receiving filesystem event: {}", e);
}
}
}
}

View File

@ -1,16 +1,12 @@
use std::cmp::max; use std::collections::HashMap;
use std::io;
use std::io::{Error, ErrorKind};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use notify::{Event, EventKind}; use linemux::{Line, MuxedLines};
use notify::event::{CreateKind, ModifyKind}; use tokio::sync::mpsc::UnboundedSender;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader, Lines};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use crate::ApacheMetrics; use crate::ApacheMetrics;
use crate::fs_watcher::{FsEventCallbacks, FsWatcher};
use crate::log_file_pattern::LogFilePath; use crate::log_file_pattern::LogFilePath;
#[derive(Copy, Clone, PartialEq)] #[derive(Copy, Clone, PartialEq)]
@ -19,274 +15,98 @@ enum LogFileKind {
Error, Error,
} }
struct LogFileMetadata { struct LogFileInfo<'a> {
pub kind: LogFileKind, pub kind: LogFileKind,
pub label: String, pub label: &'a String,
} }
impl LogFileMetadata { impl<'a> LogFileInfo<'a> {
fn get_label_set(&self) -> [(&'static str, String); 1] { fn get_label_set(&self) -> [(&'static str, String); 1] {
[("file", self.label.clone())] [("file", self.label.clone())]
} }
} }
pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> bool { pub async fn watch_logs_task(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) {
let mut watcher = LogWatcherConfiguration::new(); if let Err(error) = watch_logs(access_log_files, error_log_files, metrics).await {
println!("[LogWatcher] Error reading logs: {}", error);
for log_file in access_log_files.into_iter() { shutdown_send.send(()).unwrap();
watcher.add_file(log_file, LogFileKind::Access); }
} }
for log_file in error_log_files.into_iter() { struct LogWatcher<'a> {
watcher.add_file(log_file, LogFileKind::Error); reader: MuxedLines,
files: HashMap<PathBuf, LogFileInfo<'a>>,
} }
watcher.start(&metrics).await impl<'a> LogWatcher<'a> {
} fn new() -> io::Result<LogWatcher<'a>> {
Ok(LogWatcher {
struct LogWatcherConfiguration { reader: MuxedLines::new()?,
files: Vec<(PathBuf, LogFileMetadata)>, files: HashMap::new(),
} })
impl LogWatcherConfiguration {
fn new() -> LogWatcherConfiguration {
LogWatcherConfiguration { files: Vec::new() }
} }
fn count_files_of_kind(&self, kind: LogFileKind) -> usize { fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
return self.files.iter().filter(|(_, metadata)| metadata.kind == kind).count(); return self.files.values().filter(|info| info.kind == kind).count();
} }
fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) { async fn add_file(&mut self, log_file: &'a LogFilePath, kind: LogFileKind) -> io::Result<()> {
let path = log_file.path; let lookup_key = self.reader.add_file(&log_file.path).await?;
let label = log_file.label; self.files.insert(lookup_key, LogFileInfo { kind, label: &log_file.label });
let metadata = LogFileMetadata { kind, label }; Ok(())
self.files.push((path, metadata));
} }
async fn start(self, metrics: &ApacheMetrics) -> bool { async fn start_watching(&mut self, metrics: &ApacheMetrics) -> io::Result<()> {
if self.files.is_empty() { if self.files.is_empty() {
println!("[LogWatcher] No log files provided."); println!("[LogWatcher] No log files provided.");
return false; return Err(Error::from(ErrorKind::Unsupported));
} }
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error)); println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
struct PreparedFile { for metadata in self.files.values() {
path: PathBuf, let label_set = metadata.get_label_set();
metadata: LogFileMetadata,
fs_event_receiver: Receiver<Event>,
}
let mut prepared_files = Vec::new();
let mut fs_callbacks = FsEventCallbacks::new();
for (path, metadata) in self.files {
let (fs_event_sender, fs_event_receiver) = mpsc::channel(20);
fs_callbacks.register(&path, fs_event_sender);
prepared_files.push(PreparedFile { path, metadata, fs_event_receiver });
}
let fs_watcher = match FsWatcher::new(fs_callbacks) {
Ok(fs_watcher) => fs_watcher,
Err(e) => {
println!("[LogWatcher] Error creating filesystem watcher: {}", e);
return false;
}
};
for file in &prepared_files {
let file_path = &file.path;
if !file_path.is_absolute() {
println!("[LogWatcher] Error creating filesystem watcher, path is not absolute: {}", file_path.to_string_lossy());
return false;
}
let parent_path = if let Some(parent) = file_path.parent() {
parent
} else {
println!("[LogWatcher] Error creating filesystem watcher for parent directory of file \"{}\", parent directory does not exist", file_path.to_string_lossy());
return false;
};
if let Err(e) = fs_watcher.watch(parent_path).await {
println!("[LogWatcher] Error creating filesystem watcher for directory \"{}\": {}", parent_path.to_string_lossy(), e);
return false;
}
}
let fs_watcher = Arc::new(fs_watcher);
for file in prepared_files {
let label_set = file.metadata.get_label_set();
let _ = metrics.requests_total.get_or_create(&label_set); let _ = metrics.requests_total.get_or_create(&label_set);
let _ = metrics.errors_total.get_or_create(&label_set); let _ = metrics.errors_total.get_or_create(&label_set);
let log_watcher = match LogWatcher::create(file.path, file.metadata, metrics.clone(), Arc::clone(&fs_watcher), file.fs_event_receiver).await {
Some(log_watcher) => log_watcher,
None => return false,
};
tokio::spawn(log_watcher.watch());
} }
true
}
}
struct LogWatcher {
state: LogWatchingState,
processor: LogLineProcessor,
fs_event_receiver: Receiver<Event>,
}
impl LogWatcher {
async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: ApacheMetrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Option<Self> {
let state = match LogWatchingState::initialize(path.clone(), fs_watcher).await {
Some(state) => state,
None => return None,
};
let processor = LogLineProcessor { path, metadata, metrics };
Some(LogWatcher { state, processor, fs_event_receiver })
}
async fn watch(mut self) {
while let Ok(Some(_)) = self.state.lines.next_line().await {
// Skip lines that already existed.
}
let path = &self.processor.path;
'read_loop:
loop { loop {
if !self.processor.process_lines(&mut self.state.lines).await { if let Some(event) = self.reader.next_line().await? {
break 'read_loop; self.handle_line(event, metrics);
}
'event_loop:
loop {
let mut next_event = CoalescedFsEvent::None;
match self.fs_event_receiver.recv().await {
None => break 'read_loop,
Some(event) => {
next_event = next_event.merge(event);
while let Ok(event) = self.fs_event_receiver.try_recv() {
next_event = next_event.merge(event);
} }
} }
} }
match next_event { fn handle_line(&mut self, event: Line, metrics: &ApacheMetrics) {
CoalescedFsEvent::None => continue 'event_loop, match self.files.get(event.source()) {
CoalescedFsEvent::NewData => continue 'read_loop, Some(metadata) => {
CoalescedFsEvent::NewFile => { let label = metadata.label;
println!("[LogWatcher] File recreated: {}", path.to_string_lossy()); let (kind, family) = match metadata.kind {
LogFileKind::Access => ("access log", &metrics.requests_total),
if !self.processor.process_lines(&mut self.state.lines).await { LogFileKind::Error => ("error log", &metrics.errors_total),
break 'read_loop;
}
self.state = match self.state.reinitialize().await {
Some(state) => state,
None => break 'read_loop,
}; };
continue 'read_loop; println!("[LogWatcher] Received {} line from \"{}\": {}", kind, label, event.line());
family.get_or_create(&metadata.get_label_set()).inc();
}
None => {
println!("[LogWatcher] Received line from unknown file: {}", event.source().display());
} }
} }
} }
} }
println!("[LogWatcher] Stopping log watcher for: {}", path.to_string_lossy()); async fn watch_logs(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> {
} let mut watcher = LogWatcher::new()?;
for log_file in &access_log_files {
watcher.add_file(log_file, LogFileKind::Access).await?;
} }
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] for log_file in &error_log_files {
enum CoalescedFsEvent { watcher.add_file(log_file, LogFileKind::Error).await?;
None = 0,
NewData = 1,
NewFile = 2,
} }
impl CoalescedFsEvent { watcher.start_watching(&metrics).await?;
fn merge(self, event: Event) -> CoalescedFsEvent { Ok(())
match event.kind {
EventKind::Modify(ModifyKind::Data(_)) => {
max(self, CoalescedFsEvent::NewData)
}
EventKind::Create(CreateKind::File) => {
max(self, CoalescedFsEvent::NewFile)
}
_ => self
}
}
}
struct LogWatchingState {
path: PathBuf,
lines: Lines<BufReader<File>>,
fs_watcher: Arc<FsWatcher>,
}
impl LogWatchingState {
const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4;
async fn initialize(path: PathBuf, fs_watcher: Arc<FsWatcher>) -> Option<LogWatchingState> {
if let Err(e) = fs_watcher.watch(&path).await {
println!("[LogWatcher] Error creating filesystem watcher for file \"{}\": {}", path.to_string_lossy(), e);
return None;
}
let lines = match File::open(&path).await {
Ok(file) => BufReader::with_capacity(Self::DEFAULT_BUFFER_CAPACITY, file).lines(),
Err(e) => {
println!("[LogWatcher] Error opening file \"{}\": {}", path.to_string_lossy(), e);
return None;
}
};
Some(LogWatchingState { path, lines, fs_watcher })
}
async fn reinitialize(self) -> Option<LogWatchingState> {
LogWatchingState::initialize(self.path, self.fs_watcher).await
}
}
struct LogLineProcessor {
path: PathBuf,
metadata: LogFileMetadata,
metrics: ApacheMetrics,
}
impl LogLineProcessor {
async fn process_lines(&self, reader: &mut Lines<BufReader<File>>) -> bool {
loop {
match reader.next_line().await {
Ok(maybe_line) => match maybe_line {
Some(line) => self.handle_line(line),
None => return true,
},
Err(e) => {
println!("[LogWatcher] Error reading from file \"{}\": {}", self.path.to_string_lossy(), e);
return false;
}
}
}
}
fn handle_line(&self, line: String) {
let (kind, family) = match self.metadata.kind {
LogFileKind::Access => ("access log", &self.metrics.requests_total),
LogFileKind::Error => ("error log", &self.metrics.errors_total),
};
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, self.metadata.label, line);
family.get_or_create(&self.metadata.get_label_set()).inc();
}
} }

View File

@ -5,14 +5,14 @@ use std::str::FromStr;
use std::sync::Mutex; use std::sync::Mutex;
use tokio::signal; use tokio::signal;
use tokio::sync::mpsc;
use crate::apache_metrics::ApacheMetrics; use crate::apache_metrics::ApacheMetrics;
use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env}; use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env};
use crate::log_watcher::start_log_watcher; use crate::log_watcher::watch_logs_task;
use crate::web_server::WebServer; use crate::web_server::WebServer;
mod apache_metrics; mod apache_metrics;
mod fs_watcher;
mod log_file_pattern; mod log_file_pattern;
mod log_parser; mod log_parser;
mod log_watcher; mod log_watcher;
@ -79,21 +79,22 @@ async fn main() -> ExitCode {
}; };
let (metrics_registry, metrics) = ApacheMetrics::new(); let (metrics_registry, metrics) = ApacheMetrics::new();
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
if !start_log_watcher(access_log_files, error_log_files, metrics).await { tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone()));
return ExitCode::FAILURE;
}
tokio::spawn(server.serve(Mutex::new(metrics_registry))); tokio::spawn(server.serve(Mutex::new(metrics_registry)));
match signal::ctrl_c().await { drop(shutdown_send);
Ok(_) => {
println!("Received CTRL-C, shutting down..."); tokio::select! {
_ = signal::ctrl_c() => {
println!("Received CTRL-C, shutting down...")
}
_ = shutdown_recv.recv() => {
println!("Shutting down...");
}
}
ExitCode::SUCCESS ExitCode::SUCCESS
} }
Err(e) => {
println!("Error registering CTRL-C handler: {}", e);
ExitCode::FAILURE
}
}
}