1
0
mirror of https://github.com/chylex/Apache-Prometheus-Exporter.git synced 2025-09-16 02:32:13 +02:00

Compare commits

...

6 Commits

14 changed files with 567 additions and 474 deletions

190
Cargo.lock generated
View File

@@ -17,12 +17,19 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "anyhow"
version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]] [[package]]
name = "apache_prometheus_exporter" name = "apache_prometheus_exporter"
version = "0.1.0" version = "1.0.0"
dependencies = [ dependencies = [
"anyhow",
"hyper", "hyper",
"linemux", "notify",
"path-slash", "path-slash",
"prometheus-client", "prometheus-client",
"tokio", "tokio",
@@ -55,6 +62,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.2.1" version = "1.2.1"
@@ -73,26 +86,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]] [[package]]
name = "dtoa" name = "dtoa"
version = "1.0.3" version = "1.0.3"
@@ -101,14 +94,14 @@ checksum = "c6053ff46b5639ceb91756a85a4c8914668393a03170efd79c8884a529d80656"
[[package]] [[package]]
name = "filetime" name = "filetime"
version = "0.2.17" version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e94a7bbaa59354bc20dd75b67f23e2797b4490e9d6928203fb105c79e448c86c" checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"redox_syscall", "redox_syscall 0.3.5",
"windows-sys 0.36.1", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@@ -148,7 +141,6 @@ dependencies = [
"futures-task", "futures-task",
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"slab",
] ]
[[package]] [[package]]
@@ -220,7 +212,7 @@ version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [ dependencies = [
"bitflags", "bitflags 1.3.2",
"inotify-sys", "inotify-sys",
"libc", "libc",
] ]
@@ -242,9 +234,9 @@ checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754"
[[package]] [[package]]
name = "kqueue" name = "kqueue"
version = "1.0.6" version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e" checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
dependencies = [ dependencies = [
"kqueue-sys", "kqueue-sys",
"libc", "libc",
@@ -252,11 +244,11 @@ dependencies = [
[[package]] [[package]]
name = "kqueue-sys" name = "kqueue-sys"
version = "1.0.3" version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [ dependencies = [
"bitflags", "bitflags 1.3.2",
"libc", "libc",
] ]
@@ -266,18 +258,6 @@ version = "0.2.148"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
[[package]]
name = "linemux"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feb035c7806bd7982a317d8d66815021e91f7ab14a5fbedee22b06f608f11b43"
dependencies = [
"futures-util",
"notify",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.8" version = "0.4.8"
@@ -290,12 +270,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.17" version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
@@ -326,19 +303,19 @@ dependencies = [
[[package]] [[package]]
name = "notify" name = "notify"
version = "5.2.0" version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486" checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.4.0",
"crossbeam-channel",
"filetime", "filetime",
"inotify", "inotify",
"kqueue", "kqueue",
"libc", "libc",
"log",
"mio", "mio",
"walkdir", "walkdir",
"windows-sys 0.45.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@@ -374,7 +351,7 @@ checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"redox_syscall", "redox_syscall 0.2.16",
"smallvec", "smallvec",
"windows-sys 0.36.1", "windows-sys 0.36.1",
] ]
@@ -444,7 +421,16 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [ dependencies = [
"bitflags", "bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
dependencies = [
"bitflags 1.3.2",
] ]
[[package]] [[package]]
@@ -477,15 +463,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "slab"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.9.0" version = "1.9.0"
@@ -591,12 +568,11 @@ checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"
[[package]] [[package]]
name = "walkdir" name = "walkdir"
version = "2.3.2" version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee"
dependencies = [ dependencies = [
"same-file", "same-file",
"winapi",
"winapi-util", "winapi-util",
] ]
@@ -633,9 +609,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]] [[package]]
name = "winapi-util" name = "winapi-util"
version = "0.1.5" version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596"
dependencies = [ dependencies = [
"winapi", "winapi",
] ]
@@ -659,37 +635,13 @@ dependencies = [
"windows_x86_64_msvc 0.36.1", "windows_x86_64_msvc 0.36.1",
] ]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [ dependencies = [
"windows-targets 0.48.5", "windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
] ]
[[package]] [[package]]
@@ -698,21 +650,15 @@ version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
dependencies = [ dependencies = [
"windows_aarch64_gnullvm 0.48.5", "windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.48.5", "windows_aarch64_msvc 0.48.5",
"windows_i686_gnu 0.48.5", "windows_i686_gnu 0.48.5",
"windows_i686_msvc 0.48.5", "windows_i686_msvc 0.48.5",
"windows_x86_64_gnu 0.48.5", "windows_x86_64_gnu 0.48.5",
"windows_x86_64_gnullvm 0.48.5", "windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.48.5", "windows_x86_64_msvc 0.48.5",
] ]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]] [[package]]
name = "windows_aarch64_gnullvm" name = "windows_aarch64_gnullvm"
version = "0.48.5" version = "0.48.5"
@@ -725,12 +671,6 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.48.5" version = "0.48.5"
@@ -743,12 +683,6 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.48.5" version = "0.48.5"
@@ -761,12 +695,6 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.48.5" version = "0.48.5"
@@ -779,24 +707,12 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.48.5" version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]] [[package]]
name = "windows_x86_64_gnullvm" name = "windows_x86_64_gnullvm"
version = "0.48.5" version = "0.48.5"
@@ -809,12 +725,6 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.48.5" version = "0.48.5"

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "apache_prometheus_exporter" name = "apache_prometheus_exporter"
version = "0.1.0" version = "1.0.0"
edition = "2021" edition = "2021"
[[bin]] [[bin]]
@@ -13,8 +13,9 @@ lto = true
codegen-units = 1 codegen-units = 1
[dependencies] [dependencies]
anyhow = "1.0.75"
hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] } hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] }
linemux = "0.3.0" notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
path-slash = "0.2.1" path-slash = "0.2.1"
prometheus-client = "0.21.2" prometheus-client = "0.21.2"
tokio = { version = "1.32.0", features = ["rt", "macros", "signal"] } tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "rt", "signal"] }

View File

@@ -82,17 +82,21 @@ The wildcard must not include any prefix or suffix, so `/*/` is accepted, but `/
#### Notes #### Notes
> At least one access log file and one error log file must be found when the exporter starts, otherwise the exporter immediately exits with an error. > The exporter only searches for files when it starts. If you need the exporter to watch a new file or forget a deleted file, you must restart it.
> If a log file is deleted, the exporter will automatically resume watching it if it is re-created later. If you want the exporter to forget about deleted log files, restart the exporter.
## 4. Launch the Exporter ## 4. Launch the Exporter
Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL. Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL.
Press `Ctrl-C` to stop the exporter. If no errors are shown, the exporter will begin reading the found log files from the end, and printing each line to the standard output. When a log file is rotated, the exporter will begin reading it from the beginning.
**Important:** Due to library bugs, the exporter will currently not watch rotated log files. If you want to use this project right now, you will need to add the `-c` flag to `rotatelogs`, and restart the exporter after every rotation. Press `Ctrl-C` to stop the exporter. Signals other than `SIGINT` are ignored.
#### Notes
> The exporter is designed to work and tested with the `rotatelogs` tool in a Linux container. Any other tools or operating systems are unsupported.
> If an error occurs while reading a file or re-opening a rotated file, the exporter will stop watching it and print the error to standard output.
## 5. Collect Prometheus Metrics ## 5. Collect Prometheus Metrics

View File

@@ -1,112 +0,0 @@
use std::collections::HashMap;
use std::io;
use std::io::{Error, ErrorKind};
use std::path::PathBuf;
use linemux::{Line, MuxedLines};
use tokio::sync::mpsc::UnboundedSender;
use crate::ApacheMetrics;
use crate::log_file_pattern::LogFilePath;
#[derive(Copy, Clone, PartialEq)]
enum LogFileKind {
Access,
Error,
}
struct LogFileInfo<'a> {
pub kind: LogFileKind,
pub label: &'a String,
}
impl<'a> LogFileInfo<'a> {
fn get_label_set(&self) -> [(&'static str, String); 1] {
[("file", self.label.clone())]
}
}
pub async fn watch_logs_task(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) {
if let Err(error) = watch_logs(access_log_files, error_log_files, metrics).await {
println!("[LogWatcher] Error reading logs: {}", error);
shutdown_send.send(()).unwrap();
}
}
struct LogWatcher<'a> {
reader: MuxedLines,
files: HashMap<PathBuf, LogFileInfo<'a>>,
}
impl<'a> LogWatcher<'a> {
fn new() -> io::Result<LogWatcher<'a>> {
Ok(LogWatcher {
reader: MuxedLines::new()?,
files: HashMap::new(),
})
}
fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
return self.files.values().filter(|info| info.kind == kind).count();
}
async fn add_file(&mut self, log_file: &'a LogFilePath, kind: LogFileKind) -> io::Result<()> {
let lookup_key = self.reader.add_file(&log_file.path).await?;
self.files.insert(lookup_key, LogFileInfo { kind, label: &log_file.label });
Ok(())
}
async fn start_watching(&mut self, metrics: &ApacheMetrics) -> io::Result<()> {
if self.files.is_empty() {
println!("[LogWatcher] No log files provided.");
return Err(Error::from(ErrorKind::Unsupported));
}
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
for metadata in self.files.values() {
let label_set = metadata.get_label_set();
let _ = metrics.requests_total.get_or_create(&label_set);
let _ = metrics.errors_total.get_or_create(&label_set);
}
loop {
if let Some(event) = self.reader.next_line().await? {
self.handle_line(event, metrics);
}
}
}
fn handle_line(&mut self, event: Line, metrics: &ApacheMetrics) {
match self.files.get(event.source()) {
Some(metadata) => {
let label = metadata.label;
let (kind, family) = match metadata.kind {
LogFileKind::Access => ("access log", &metrics.requests_total),
LogFileKind::Error => ("error log", &metrics.errors_total),
};
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, label, event.line());
family.get_or_create(&metadata.get_label_set()).inc();
}
None => {
println!("[LogWatcher] Received line from unknown file: {}", event.source().display());
}
}
}
}
async fn watch_logs(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> {
let mut watcher = LogWatcher::new()?;
for log_file in &access_log_files {
watcher.add_file(log_file, LogFileKind::Access).await?;
}
for log_file in &error_log_files {
watcher.add_file(log_file, LogFileKind::Error).await?;
}
watcher.start_watching(&metrics).await?;
Ok(())
}

View File

@@ -0,0 +1,62 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use notify::{ErrorKind, Event, recommended_watcher, RecommendedWatcher, RecursiveMode, Result, Watcher};
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
pub struct FsWatcher {
watcher: Mutex<RecommendedWatcher>,
}
impl FsWatcher {
pub fn new(callbacks: FsEventCallbacks) -> Result<Self> {
let watcher = recommended_watcher(move |event| callbacks.handle_event(event))?;
let watcher = Mutex::new(watcher);
Ok(Self { watcher })
}
pub async fn watch(&self, path: &Path) -> Result<()> {
let mut watcher = self.watcher.lock().await;
if let Err(e) = watcher.unwatch(path) {
if !matches!(e.kind, ErrorKind::WatchNotFound) {
return Err(e);
}
}
watcher.watch(path, RecursiveMode::NonRecursive)
}
}
pub struct FsEventCallbacks {
senders: HashMap<PathBuf, Sender<Event>>,
}
impl FsEventCallbacks {
pub fn new() -> Self {
Self { senders: HashMap::new() }
}
pub fn register(&mut self, path: &Path, sender: Sender<Event>) {
self.senders.insert(path.to_path_buf(), sender);
}
fn handle_event(&self, event: Result<Event>) {
match event {
Ok(event) => {
for path in &event.paths {
if let Some(sender) = self.senders.get(path) {
if let Err(e) = sender.try_send(event.clone()) {
println!("[FsWatcher] Error sending filesystem event for path \"{}\": {}", path.to_string_lossy(), e);
}
}
}
}
Err(e) => {
println!("[FsWatcher] Error receiving filesystem event: {}", e);
}
}
}
}

View File

@@ -1,9 +1,9 @@
use std::{env, io};
use std::env::VarError;
use std::fs::DirEntry; use std::fs::DirEntry;
use std::io;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Result};
use path_slash::PathExt; use path_slash::PathExt;
/// Reads and parses an environment variable that determines the path and file name pattern of log files. /// Reads and parses an environment variable that determines the path and file name pattern of log files.
@@ -13,34 +13,22 @@ use path_slash::PathExt;
/// 1. A simple path to a file. /// 1. A simple path to a file.
/// 2. A path with a wildcard anywhere in the file name. /// 2. A path with a wildcard anywhere in the file name.
/// 3. A path with a standalone wildcard component (i.e. no prefix or suffix in the folder name). /// 3. A path with a standalone wildcard component (i.e. no prefix or suffix in the folder name).
pub fn parse_log_file_pattern_from_env(variable_name: &str) -> Result<LogFilePattern, String> { pub fn parse_log_file_pattern_from_str(pattern: &str) -> Result<LogFilePattern> {
match env::var(variable_name) { let pattern = Path::new(pattern).to_slash().ok_or_else(|| anyhow!("Path is invalid"))?;
Ok(str) => { if pattern.trim().is_empty() {
let pattern_str = Path::new(&str).to_slash().ok_or(format!("Environment variable {} contains an invalid path.", variable_name))?; bail!("Path is empty");
parse_log_file_pattern_from_str(&pattern_str)
}
Err(err) => match err {
VarError::NotPresent => Err(format!("Environment variable {} must be set.", variable_name)),
VarError::NotUnicode(_) => Err(format!("Environment variable {} contains invalid characters.", variable_name))
}
}
} }
fn parse_log_file_pattern_from_str(pattern_str: &str) -> Result<LogFilePattern, String> { if let Some((left, right)) = pattern.split_once('*') {
if pattern_str.trim().is_empty() {
return Err(String::from("Path is empty."));
}
if let Some((left, right)) = pattern_str.split_once('*') {
parse_log_file_pattern_split_on_wildcard(left, right) parse_log_file_pattern_split_on_wildcard(left, right)
} else { } else {
Ok(LogFilePattern::WithoutWildcard(pattern_str.to_string())) Ok(LogFilePattern::WithoutWildcard(pattern.to_string()))
} }
} }
fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<LogFilePattern, String> { fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<LogFilePattern> {
if left.contains('*') || right.contains('*') { if left.contains('*') || right.contains('*') {
return Err(String::from("Path has too many wildcards.")); bail!("Path has too many wildcards");
} }
if left.ends_with('/') && right.starts_with('/') { if left.ends_with('/') && right.starts_with('/') {
@@ -51,7 +39,7 @@ fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<L
} }
if right.contains('/') { if right.contains('/') {
return Err(String::from("Path has a folder wildcard with a prefix or suffix.")); bail!("Path has a folder wildcard with a prefix or suffix");
} }
if let Some((folder_path, file_name_prefix)) = left.rsplit_once('/') { if let Some((folder_path, file_name_prefix)) = left.rsplit_once('/') {
@@ -122,10 +110,7 @@ impl LogFilePattern {
} }
fn search_without_wildcard(path_str: &String) -> Result<Vec<LogFilePath>, io::Error> { fn search_without_wildcard(path_str: &String) -> Result<Vec<LogFilePath>, io::Error> {
let path = Path::new(path_str); if Path::new(path_str).is_file() {
let is_valid = path.is_file() || matches!(path.parent(), Some(parent) if parent.is_dir());
if is_valid {
Ok(vec![LogFilePath::with_empty_label(path_str)]) Ok(vec![LogFilePath::with_empty_label(path_str)])
} else { } else {
Err(io::Error::from(ErrorKind::NotFound)) Err(io::Error::from(ErrorKind::NotFound))
@@ -178,27 +163,27 @@ impl LogFilePath {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::log_file_pattern::{LogFilePattern, parse_log_file_pattern_from_str}; use super::{LogFilePattern, parse_log_file_pattern_from_str};
#[test] #[test]
fn empty_path() { fn empty_path() {
assert!(matches!(parse_log_file_pattern_from_str(""), Err(err) if err == "Path is empty.")); assert!(matches!(parse_log_file_pattern_from_str(""), Err(err) if err.to_string() == "Path is empty"));
assert!(matches!(parse_log_file_pattern_from_str(" "), Err(err) if err == "Path is empty.")); assert!(matches!(parse_log_file_pattern_from_str(" "), Err(err) if err.to_string() == "Path is empty"));
} }
#[test] #[test]
fn too_many_wildcards() { fn too_many_wildcards() {
assert!(matches!(parse_log_file_pattern_from_str("/path/*/to/files/*.log"), Err(err) if err == "Path has too many wildcards.")); assert!(matches!(parse_log_file_pattern_from_str("/path/*/to/files/*.log"), Err(err) if err.to_string() == "Path has too many wildcards"));
} }
#[test] #[test]
fn folder_wildcard_with_prefix_not_supported() { fn folder_wildcard_with_prefix_not_supported() {
assert!(matches!(parse_log_file_pattern_from_str("/path/*abc/to/files/access.log"), Err(err) if err == "Path has a folder wildcard with a prefix or suffix.")); assert!(matches!(parse_log_file_pattern_from_str("/path/*abc/to/files/access.log"), Err(err) if err.to_string() == "Path has a folder wildcard with a prefix or suffix"));
} }
#[test] #[test]
fn folder_wildcard_with_suffix_not_supported() { fn folder_wildcard_with_suffix_not_supported() {
assert!(matches!(parse_log_file_pattern_from_str("/path/abc*/to/files/access.log"), Err(err) if err == "Path has a folder wildcard with a prefix or suffix.")); assert!(matches!(parse_log_file_pattern_from_str("/path/abc*/to/files/access.log"), Err(err) if err.to_string() == "Path has a folder wildcard with a prefix or suffix"));
} }
#[test] #[test]

View File

@@ -0,0 +1,256 @@
use std::cmp::max;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{anyhow, bail, Context, Result};
use notify::{Event, EventKind};
use notify::event::{CreateKind, ModifyKind};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader, Lines};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use crate::logs::filesystem_watcher::{FsEventCallbacks, FsWatcher};
use crate::logs::log_file_pattern::LogFilePath;
use crate::metrics::Metrics;
#[derive(Copy, Clone, PartialEq)]
pub enum LogFileKind {
Access,
Error,
}
struct LogFileMetadata {
pub kind: LogFileKind,
pub label: String,
}
impl LogFileMetadata {
fn get_label_set(&self) -> [(&'static str, String); 1] {
[("file", self.label.clone())]
}
}
pub struct LogWatcherConfiguration {
files: Vec<(PathBuf, LogFileMetadata)>,
}
impl LogWatcherConfiguration {
pub fn new() -> LogWatcherConfiguration {
LogWatcherConfiguration { files: Vec::new() }
}
fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
return self.files.iter().filter(|(_, metadata)| metadata.kind == kind).count();
}
pub fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) {
let path = log_file.path;
let label = log_file.label;
let metadata = LogFileMetadata { kind, label };
self.files.push((path, metadata));
}
pub async fn start(self, metrics: &Metrics) -> Result<()> {
if self.files.is_empty() {
bail!("No log files provided");
}
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
struct PreparedFile {
path: PathBuf,
metadata: LogFileMetadata,
fs_event_receiver: Receiver<Event>,
}
let mut prepared_files = Vec::new();
let mut fs_callbacks = FsEventCallbacks::new();
for (path, metadata) in self.files {
let (fs_event_sender, fs_event_receiver) = mpsc::channel(20);
fs_callbacks.register(&path, fs_event_sender);
prepared_files.push(PreparedFile { path, metadata, fs_event_receiver });
}
let fs_watcher = FsWatcher::new(fs_callbacks).context("Could not create filesystem watcher")?;
for file in &prepared_files {
let file_path = &file.path;
if !file_path.is_absolute() {
bail!("Path is not absolute: {}", file_path.to_string_lossy());
}
let parent_path = file_path.parent().ok_or_else(|| anyhow!("Path has no parent: {}", file_path.to_string_lossy()))?;
fs_watcher.watch(parent_path).await.with_context(|| format!("Could not create filesystem watcher for directory: {}", parent_path.to_string_lossy()))?;
}
let fs_watcher = Arc::new(fs_watcher);
for file in prepared_files {
let label_set = file.metadata.get_label_set();
let _ = metrics.requests_total.get_or_create(&label_set);
let _ = metrics.errors_total.get_or_create(&label_set);
let log_watcher = LogWatcher::create(file.path.clone(), file.metadata, metrics.clone(), Arc::clone(&fs_watcher), file.fs_event_receiver);
let log_watcher = log_watcher.await.with_context(|| format!("Could not watch log file: {}", file.path.to_string_lossy()))?;
tokio::spawn(log_watcher.watch());
}
Ok(())
}
}
struct LogWatcher {
state: LogWatchingState,
processor: LogLineProcessor,
fs_event_receiver: Receiver<Event>,
}
impl LogWatcher {
async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: Metrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Result<Self> {
let state = LogWatchingState::initialize(path.clone(), fs_watcher).await?;
let processor = LogLineProcessor { path, metadata, metrics };
Ok(LogWatcher { state, processor, fs_event_receiver })
}
async fn watch(mut self) {
while let Ok(Some(_)) = self.state.lines.next_line().await {
// Skip lines that already existed.
}
let path = &self.processor.path;
'read_loop:
loop {
if !self.processor.process_lines(&mut self.state.lines).await {
break 'read_loop;
}
'event_loop:
loop {
let mut next_event = CoalescedFsEvent::None;
match self.fs_event_receiver.recv().await {
None => break 'read_loop,
Some(event) => {
next_event = next_event.merge(event);
while let Ok(event) = self.fs_event_receiver.try_recv() {
next_event = next_event.merge(event);
}
}
}
match next_event {
CoalescedFsEvent::None => continue 'event_loop,
CoalescedFsEvent::NewData => continue 'read_loop,
CoalescedFsEvent::NewFile => {
println!("[LogWatcher] File recreated: {}", path.to_string_lossy());
if !self.processor.process_lines(&mut self.state.lines).await {
break 'read_loop;
}
self.state = match self.state.reinitialize().await {
Ok(state) => state,
Err(e) => {
println!("Could not re-watch log file \"{}\": {}", path.to_string_lossy(), e);
break 'read_loop;
}
};
while let Ok(Some(_)) = self.state.lines.next_line().await {
// There are occasional spurious file creation events, so reading
// from the beginning would read lines that were already counted.
}
continue 'read_loop;
}
}
}
}
println!("[LogWatcher] Stopping log watcher for: {}", path.to_string_lossy());
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
enum CoalescedFsEvent {
None = 0,
NewData = 1,
NewFile = 2,
}
impl CoalescedFsEvent {
fn merge(self, event: Event) -> CoalescedFsEvent {
match event.kind {
EventKind::Modify(ModifyKind::Data(_)) => {
max(self, CoalescedFsEvent::NewData)
}
EventKind::Create(CreateKind::File) => {
max(self, CoalescedFsEvent::NewFile)
}
_ => self
}
}
}
struct LogWatchingState {
path: PathBuf,
lines: Lines<BufReader<File>>,
fs_watcher: Arc<FsWatcher>,
}
impl LogWatchingState {
const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4;
async fn initialize(path: PathBuf, fs_watcher: Arc<FsWatcher>) -> Result<LogWatchingState> {
fs_watcher.watch(&path).await.context("Could not create filesystem watcher")?;
let file = File::open(&path).await.context("Could not open file")?;
let lines = BufReader::with_capacity(Self::DEFAULT_BUFFER_CAPACITY, file).lines();
Ok(LogWatchingState { path, lines, fs_watcher })
}
async fn reinitialize(self) -> Result<LogWatchingState> {
LogWatchingState::initialize(self.path, self.fs_watcher).await
}
}
struct LogLineProcessor {
path: PathBuf,
metadata: LogFileMetadata,
metrics: Metrics,
}
impl LogLineProcessor {
async fn process_lines(&self, reader: &mut Lines<BufReader<File>>) -> bool {
loop {
match reader.next_line().await {
Ok(maybe_line) => match maybe_line {
Some(line) => self.handle_line(line),
None => return true,
},
Err(e) => {
println!("[LogWatcher] Error reading from file \"{}\": {}", self.path.to_string_lossy(), e);
return false;
}
}
}
}
fn handle_line(&self, line: String) {
let (kind, family) = match self.metadata.kind {
LogFileKind::Access => ("access log", &self.metrics.requests_total),
LogFileKind::Error => ("error log", &self.metrics.errors_total),
};
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, self.metadata.label, line);
family.get_or_create(&self.metadata.get_label_set()).inc();
}
}

48
src/logs/mod.rs Normal file
View File

@@ -0,0 +1,48 @@
use std::env;
use std::env::VarError;
use anyhow::{anyhow, bail, Context, Result};
use log_file_watcher::{LogFileKind, LogWatcherConfiguration};
use crate::logs::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_str};
use crate::metrics::Metrics;
mod access_log_parser;
mod filesystem_watcher;
mod log_file_pattern;
mod log_file_watcher;
pub fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Result<Vec<LogFilePath>> {
let log_file_pattern_str = env::var(environment_variable_name).map_err(|err| match err {
VarError::NotPresent => anyhow!("Environment variable {} must be set", environment_variable_name),
VarError::NotUnicode(_) => anyhow!("Environment variable {} contains invalid characters", environment_variable_name)
})?;
let log_file_pattern = parse_log_file_pattern_from_str(&log_file_pattern_str).with_context(|| format!("Could not parse pattern: {}", log_file_pattern_str))?;
let log_files = log_file_pattern.search().with_context(|| format!("Could not search files: {}", log_file_pattern_str))?;
if log_files.is_empty() {
bail!("No files match pattern: {}", log_file_pattern_str);
}
for log_file in &log_files {
println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label);
}
Ok(log_files)
}
pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: Metrics) -> Result<()> {
let mut watcher = LogWatcherConfiguration::new();
for log_file in access_log_files.into_iter() {
watcher.add_file(log_file, LogFileKind::Access);
}
for log_file in error_log_files.into_iter() {
watcher.add_file(log_file, LogFileKind::Error);
}
watcher.start(&metrics).await
}

View File

@@ -1,100 +1,38 @@
use std::env; use std::env;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::process::ExitCode;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Mutex; use std::sync::Mutex;
use anyhow::{anyhow, Context};
use tokio::signal; use tokio::signal;
use tokio::sync::mpsc;
use crate::apache_metrics::ApacheMetrics; use crate::metrics::Metrics;
use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env}; use crate::web::WebServer;
use crate::log_watcher::watch_logs_task;
use crate::web_server::WebServer;
mod apache_metrics; mod logs;
mod log_file_pattern; mod metrics;
mod log_parser; mod web;
mod log_watcher;
mod web_server;
const ACCESS_LOG_FILE_PATTERN: &str = "ACCESS_LOG_FILE_PATTERN"; const ACCESS_LOG_FILE_PATTERN: &str = "ACCESS_LOG_FILE_PATTERN";
const ERROR_LOG_FILE_PATTERN: &str = "ERROR_LOG_FILE_PATTERN"; const ERROR_LOG_FILE_PATTERN: &str = "ERROR_LOG_FILE_PATTERN";
fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec<LogFilePath>> {
let log_file_pattern = match parse_log_file_pattern_from_env(environment_variable_name) {
Ok(pattern) => pattern,
Err(error) => {
println!("Error: {}", error);
return None;
}
};
let log_files = match log_file_pattern.search() {
Ok(files) => files,
Err(error) => {
println!("Error searching {} files: {}", log_kind, error);
return None;
}
};
if log_files.is_empty() {
println!("Found no matching {} files.", log_kind);
return None;
}
for log_file in &log_files {
println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label);
}
Some(log_files)
}
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() -> ExitCode { async fn main() -> anyhow::Result<()> {
let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1")); let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1"));
let bind_ip = match IpAddr::from_str(&host) { let bind_ip = IpAddr::from_str(&host).map_err(|_| anyhow!("Invalid HTTP host: {}", host))?;
Ok(addr) => addr,
Err(_) => {
println!("Invalid HTTP host: {}", host);
return ExitCode::FAILURE;
}
};
println!("Initializing exporter..."); println!("Initializing exporter...");
let access_log_files = match find_log_files(ACCESS_LOG_FILE_PATTERN, "access log") { let access_log_files = logs::find_log_files(ACCESS_LOG_FILE_PATTERN, "access log").context("Could not find access log files")?;
Some(files) => files, let error_log_files = logs::find_log_files(ERROR_LOG_FILE_PATTERN, "error log").context("Could not find error log files")?;
None => return ExitCode::FAILURE,
};
let error_log_files = match find_log_files(ERROR_LOG_FILE_PATTERN, "error log") { let server = WebServer::try_bind(SocketAddr::new(bind_ip, 9240)).context("Could not configure web server")?;
Some(files) => files, let (metrics_registry, metrics) = Metrics::new();
None => return ExitCode::FAILURE,
};
let server = match WebServer::try_bind(SocketAddr::new(bind_ip, 9240)) { logs::start_log_watcher(access_log_files, error_log_files, metrics).await.context("Could not start watching logs")?;
Some(server) => server,
None => return ExitCode::FAILURE
};
let (metrics_registry, metrics) = ApacheMetrics::new();
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone()));
tokio::spawn(server.serve(Mutex::new(metrics_registry))); tokio::spawn(server.serve(Mutex::new(metrics_registry)));
drop(shutdown_send); signal::ctrl_c().await.with_context(|| "Could not register CTRL-C handler")?;
println!("Received CTRL-C, shutting down...");
tokio::select! { Ok(())
_ = signal::ctrl_c() => {
println!("Received CTRL-C, shutting down...")
}
_ = shutdown_recv.recv() => {
println!("Shutting down...");
}
}
ExitCode::SUCCESS
} }

View File

@@ -4,20 +4,17 @@ use prometheus_client::registry::Registry;
type SingleLabel = [(&'static str, String); 1]; type SingleLabel = [(&'static str, String); 1];
#[derive(Clone)] #[derive(Clone, Default)]
pub struct ApacheMetrics { pub struct Metrics {
pub requests_total: Family<SingleLabel, Counter>, pub requests_total: Family<SingleLabel, Counter>,
pub errors_total: Family<SingleLabel, Counter> pub errors_total: Family<SingleLabel, Counter>
} }
impl ApacheMetrics { impl Metrics {
pub fn new() -> (Registry, ApacheMetrics) { pub fn new() -> (Registry, Metrics) {
let mut registry = <Registry>::default(); let mut registry = <Registry>::default();
let metrics = ApacheMetrics { let metrics = Metrics::default();
requests_total: Family::<SingleLabel, Counter>::default(),
errors_total: Family::<SingleLabel, Counter>::default()
};
registry.register("apache_requests", "Number of received requests", metrics.requests_total.clone()); registry.register("apache_requests", "Number of received requests", metrics.requests_total.clone());
registry.register("apache_errors", "Number of logged errors", metrics.errors_total.clone()); registry.register("apache_errors", "Number of logged errors", metrics.errors_total.clone());

View File

@@ -0,0 +1,42 @@
use std::fmt;
use std::sync::{Arc, Mutex};
use hyper::{Body, http, Response, StatusCode};
use hyper::header::CONTENT_TYPE;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
//noinspection SpellCheckingInspection
const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text; version=1.0.0; charset=utf-8";
pub async fn handle(metrics_registry: Arc<Mutex<Registry>>) -> http::Result<Response<Body>> {
match try_encode(metrics_registry) {
MetricsEncodeResult::Ok(buf) => {
Response::builder().status(StatusCode::OK).header(CONTENT_TYPE, METRICS_CONTENT_TYPE).body(Body::from(buf))
}
MetricsEncodeResult::FailedAcquiringRegistryLock => {
println!("[WebServer] Failed acquiring lock on registry.");
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
}
MetricsEncodeResult::FailedEncodingMetrics(e) => {
println!("[WebServer] Error encoding metrics: {}", e);
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
}
}
}
enum MetricsEncodeResult {
Ok(String),
FailedAcquiringRegistryLock,
FailedEncodingMetrics(fmt::Error),
}
fn try_encode(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult {
let mut buf = String::new();
return if let Ok(metrics_registry) = metrics_registry.lock() {
encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf))
} else {
MetricsEncodeResult::FailedAcquiringRegistryLock
};
}

57
src/web/mod.rs Normal file
View File

@@ -0,0 +1,57 @@
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::Context;
use hyper::{Body, Error, Method, Request, Response, Server, StatusCode};
use hyper::http::Result;
use hyper::server::Builder;
use hyper::server::conn::AddrIncoming;
use hyper::service::{make_service_fn, service_fn};
use prometheus_client::registry::Registry;
mod metrics_endpoint;
const MAX_BUFFER_SIZE: usize = 1024 * 32;
pub struct WebServer {
builder: Builder<AddrIncoming>,
}
impl WebServer {
//noinspection HttpUrlsUsage
pub fn try_bind(addr: SocketAddr) -> anyhow::Result<WebServer> {
println!("[WebServer] Starting web server on {0} with metrics endpoint: http://{0}/metrics", addr);
let builder = Server::try_bind(&addr).with_context(|| format!("Could not bind to {}", addr))?;
let builder = builder.tcp_keepalive(Some(Duration::from_secs(60)));
let builder = builder.http1_only(true);
let builder = builder.http1_keepalive(true);
let builder = builder.http1_max_buf_size(MAX_BUFFER_SIZE);
let builder = builder.http1_header_read_timeout(Duration::from_secs(10));
Ok(WebServer { builder })
}
pub async fn serve(self, metrics_registry: Mutex<Registry>) {
let metrics_registry = Arc::new(metrics_registry);
let service = make_service_fn(move |_| {
let metrics_registry = Arc::clone(&metrics_registry);
async move {
Ok::<_, Error>(service_fn(move |req| handle_request(req, Arc::clone(&metrics_registry))))
}
});
if let Err(e) = self.builder.serve(service).await {
println!("[WebServer] Error starting web server: {}", e);
}
}
}
async fn handle_request(req: Request<Body>, metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
if req.method() == Method::GET && req.uri().path() == "/metrics" {
metrics_endpoint::handle(Arc::clone(&metrics_registry)).await
} else {
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty())
}
}

View File

@@ -1,95 +0,0 @@
use std::fmt;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use hyper::{Body, Error, header, Method, Request, Response, Server, StatusCode};
use hyper::http::Result;
use hyper::server::Builder;
use hyper::server::conn::AddrIncoming;
use hyper::service::{make_service_fn, service_fn};
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
const MAX_BUFFER_SIZE: usize = 1024 * 32;
pub struct WebServer {
builder: Builder<AddrIncoming>,
}
impl WebServer {
//noinspection HttpUrlsUsage
pub fn try_bind(addr: SocketAddr) -> Option<WebServer> {
println!("[WebServer] Starting web server on {0} with metrics endpoint: http://{0}/metrics", addr);
let builder = match Server::try_bind(&addr) {
Ok(builder) => builder,
Err(e) => {
println!("[WebServer] Could not bind to {}: {}", addr, e);
return None;
}
};
let builder = builder.tcp_keepalive(Some(Duration::from_secs(60)));
let builder = builder.http1_only(true);
let builder = builder.http1_keepalive(true);
let builder = builder.http1_max_buf_size(MAX_BUFFER_SIZE);
let builder = builder.http1_header_read_timeout(Duration::from_secs(10));
Some(WebServer { builder })
}
pub async fn serve(self, metrics_registry: Mutex<Registry>) {
let metrics_registry = Arc::new(metrics_registry);
let service = make_service_fn(move |_| {
let metrics_registry = Arc::clone(&metrics_registry);
async move {
Ok::<_, Error>(service_fn(move |req| handle_request(req, Arc::clone(&metrics_registry))))
}
});
if let Err(e) = self.builder.serve(service).await {
println!("[WebServer] Error starting web server: {}", e);
}
}
}
async fn handle_request(req: Request<Body>, metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
if req.method() == Method::GET && req.uri().path() == "/metrics" {
metrics_handler(Arc::clone(&metrics_registry)).await
} else {
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty())
}
}
//noinspection SpellCheckingInspection
async fn metrics_handler(metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
match encode_metrics(metrics_registry) {
MetricsEncodeResult::Ok(buf) => {
Response::builder().status(StatusCode::OK).header(header::CONTENT_TYPE, "application/openmetrics-text; version=1.0.0; charset=utf-8").body(Body::from(buf))
}
MetricsEncodeResult::FailedAcquiringRegistryLock => {
println!("[WebServer] Failed acquiring lock on registry.");
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
}
MetricsEncodeResult::FailedEncodingMetrics(e) => {
println!("[WebServer] Error encoding metrics: {}", e);
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
}
}
}
enum MetricsEncodeResult {
Ok(String),
FailedAcquiringRegistryLock,
FailedEncodingMetrics(fmt::Error),
}
fn encode_metrics(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult {
let mut buf = String::new();
return if let Ok(metrics_registry) = metrics_registry.lock() {
encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf))
} else {
MetricsEncodeResult::FailedAcquiringRegistryLock
};
}