mirror of
https://github.com/chylex/Apache-Prometheus-Exporter.git
synced 2025-09-15 17:32:12 +02:00
Compare commits
6 Commits
7def8921b0
...
main
Author | SHA1 | Date | |
---|---|---|---|
c2f47cf736
|
|||
f942b9830d
|
|||
974f7f4035
|
|||
bbc416b8d3
|
|||
f0e1447ae5
|
|||
ec099185c3
|
190
Cargo.lock
generated
190
Cargo.lock
generated
@@ -17,12 +17,19 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.75"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
|
||||
|
||||
[[package]]
|
||||
name = "apache_prometheus_exporter"
|
||||
version = "0.1.0"
|
||||
version = "1.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"hyper",
|
||||
"linemux",
|
||||
"notify",
|
||||
"path-slash",
|
||||
"prometheus-client",
|
||||
"tokio",
|
||||
@@ -55,6 +62,12 @@ version = "1.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.2.1"
|
||||
@@ -73,26 +86,6 @@ version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dtoa"
|
||||
version = "1.0.3"
|
||||
@@ -101,14 +94,14 @@ checksum = "c6053ff46b5639ceb91756a85a4c8914668393a03170efd79c8884a529d80656"
|
||||
|
||||
[[package]]
|
||||
name = "filetime"
|
||||
version = "0.2.17"
|
||||
version = "0.2.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e94a7bbaa59354bc20dd75b67f23e2797b4490e9d6928203fb105c79e448c86c"
|
||||
checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"windows-sys 0.36.1",
|
||||
"redox_syscall 0.3.5",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -148,7 +141,6 @@ dependencies = [
|
||||
"futures-task",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -220,7 +212,7 @@ version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"bitflags 1.3.2",
|
||||
"inotify-sys",
|
||||
"libc",
|
||||
]
|
||||
@@ -242,9 +234,9 @@ checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754"
|
||||
|
||||
[[package]]
|
||||
name = "kqueue"
|
||||
version = "1.0.6"
|
||||
version = "1.0.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
|
||||
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
|
||||
dependencies = [
|
||||
"kqueue-sys",
|
||||
"libc",
|
||||
@@ -252,11 +244,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "kqueue-sys"
|
||||
version = "1.0.3"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
|
||||
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"bitflags 1.3.2",
|
||||
"libc",
|
||||
]
|
||||
|
||||
@@ -266,18 +258,6 @@ version = "0.2.148"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
|
||||
|
||||
[[package]]
|
||||
name = "linemux"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "feb035c7806bd7982a317d8d66815021e91f7ab14a5fbedee22b06f608f11b43"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"notify",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.8"
|
||||
@@ -290,12 +270,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.17"
|
||||
version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
@@ -326,19 +303,19 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "5.2.0"
|
||||
version = "6.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486"
|
||||
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"crossbeam-channel",
|
||||
"bitflags 2.4.0",
|
||||
"filetime",
|
||||
"inotify",
|
||||
"kqueue",
|
||||
"libc",
|
||||
"log",
|
||||
"mio",
|
||||
"walkdir",
|
||||
"windows-sys 0.45.0",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -374,7 +351,7 @@ checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"redox_syscall 0.2.16",
|
||||
"smallvec",
|
||||
"windows-sys 0.36.1",
|
||||
]
|
||||
@@ -444,7 +421,16 @@ version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"bitflags 1.3.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.3.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -477,15 +463,6 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "1.9.0"
|
||||
@@ -591,12 +568,11 @@ checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"
|
||||
|
||||
[[package]]
|
||||
name = "walkdir"
|
||||
version = "2.3.2"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
|
||||
checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee"
|
||||
dependencies = [
|
||||
"same-file",
|
||||
"winapi",
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
@@ -633,9 +609,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||
|
||||
[[package]]
|
||||
name = "winapi-util"
|
||||
version = "0.1.5"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
|
||||
checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
@@ -659,37 +635,13 @@ dependencies = [
|
||||
"windows_x86_64_msvc 0.36.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.45.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
|
||||
dependencies = [
|
||||
"windows-targets 0.42.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.48.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
|
||||
dependencies = [
|
||||
"windows-targets 0.48.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-targets"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
|
||||
dependencies = [
|
||||
"windows_aarch64_gnullvm 0.42.2",
|
||||
"windows_aarch64_msvc 0.42.2",
|
||||
"windows_i686_gnu 0.42.2",
|
||||
"windows_i686_msvc 0.42.2",
|
||||
"windows_x86_64_gnu 0.42.2",
|
||||
"windows_x86_64_gnullvm 0.42.2",
|
||||
"windows_x86_64_msvc 0.42.2",
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -698,21 +650,15 @@ version = "0.48.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
|
||||
dependencies = [
|
||||
"windows_aarch64_gnullvm 0.48.5",
|
||||
"windows_aarch64_gnullvm",
|
||||
"windows_aarch64_msvc 0.48.5",
|
||||
"windows_i686_gnu 0.48.5",
|
||||
"windows_i686_msvc 0.48.5",
|
||||
"windows_x86_64_gnu 0.48.5",
|
||||
"windows_x86_64_gnullvm 0.48.5",
|
||||
"windows_x86_64_gnullvm",
|
||||
"windows_x86_64_msvc 0.48.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_gnullvm"
|
||||
version = "0.48.5"
|
||||
@@ -725,12 +671,6 @@ version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
|
||||
|
||||
[[package]]
|
||||
name = "windows_aarch64_msvc"
|
||||
version = "0.48.5"
|
||||
@@ -743,12 +683,6 @@ version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_gnu"
|
||||
version = "0.48.5"
|
||||
@@ -761,12 +695,6 @@ version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
|
||||
|
||||
[[package]]
|
||||
name = "windows_i686_msvc"
|
||||
version = "0.48.5"
|
||||
@@ -779,24 +707,12 @@ version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnu"
|
||||
version = "0.48.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_gnullvm"
|
||||
version = "0.48.5"
|
||||
@@ -809,12 +725,6 @@ version = "0.36.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.42.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
|
||||
|
||||
[[package]]
|
||||
name = "windows_x86_64_msvc"
|
||||
version = "0.48.5"
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "apache_prometheus_exporter"
|
||||
version = "0.1.0"
|
||||
version = "1.0.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
@@ -13,8 +13,9 @@ lto = true
|
||||
codegen-units = 1
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.75"
|
||||
hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] }
|
||||
linemux = "0.3.0"
|
||||
notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
|
||||
path-slash = "0.2.1"
|
||||
prometheus-client = "0.21.2"
|
||||
tokio = { version = "1.32.0", features = ["rt", "macros", "signal"] }
|
||||
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "rt", "signal"] }
|
||||
|
14
README.md
14
README.md
@@ -82,17 +82,21 @@ The wildcard must not include any prefix or suffix, so `/*/` is accepted, but `/
|
||||
|
||||
#### Notes
|
||||
|
||||
> At least one access log file and one error log file must be found when the exporter starts, otherwise the exporter immediately exits with an error.
|
||||
|
||||
> If a log file is deleted, the exporter will automatically resume watching it if it is re-created later. If you want the exporter to forget about deleted log files, restart the exporter.
|
||||
> The exporter only searches for files when it starts. If you need the exporter to watch a new file or forget a deleted file, you must restart it.
|
||||
|
||||
## 4. Launch the Exporter
|
||||
|
||||
Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL.
|
||||
|
||||
Press `Ctrl-C` to stop the exporter.
|
||||
If no errors are shown, the exporter will begin reading the found log files from the end, and printing each line to the standard output. When a log file is rotated, the exporter will begin reading it from the beginning.
|
||||
|
||||
**Important:** Due to library bugs, the exporter will currently not watch rotated log files. If you want to use this project right now, you will need to add the `-c` flag to `rotatelogs`, and restart the exporter after every rotation.
|
||||
Press `Ctrl-C` to stop the exporter. Signals other than `SIGINT` are ignored.
|
||||
|
||||
#### Notes
|
||||
|
||||
> The exporter is designed to work and tested with the `rotatelogs` tool in a Linux container. Any other tools or operating systems are unsupported.
|
||||
|
||||
> If an error occurs while reading a file or re-opening a rotated file, the exporter will stop watching it and print the error to standard output.
|
||||
|
||||
## 5. Collect Prometheus Metrics
|
||||
|
||||
|
@@ -1,112 +0,0 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use linemux::{Line, MuxedLines};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
use crate::ApacheMetrics;
|
||||
use crate::log_file_pattern::LogFilePath;
|
||||
|
||||
#[derive(Copy, Clone, PartialEq)]
|
||||
enum LogFileKind {
|
||||
Access,
|
||||
Error,
|
||||
}
|
||||
|
||||
struct LogFileInfo<'a> {
|
||||
pub kind: LogFileKind,
|
||||
pub label: &'a String,
|
||||
}
|
||||
|
||||
impl<'a> LogFileInfo<'a> {
|
||||
fn get_label_set(&self) -> [(&'static str, String); 1] {
|
||||
[("file", self.label.clone())]
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn watch_logs_task(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) {
|
||||
if let Err(error) = watch_logs(access_log_files, error_log_files, metrics).await {
|
||||
println!("[LogWatcher] Error reading logs: {}", error);
|
||||
shutdown_send.send(()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
struct LogWatcher<'a> {
|
||||
reader: MuxedLines,
|
||||
files: HashMap<PathBuf, LogFileInfo<'a>>,
|
||||
}
|
||||
|
||||
impl<'a> LogWatcher<'a> {
|
||||
fn new() -> io::Result<LogWatcher<'a>> {
|
||||
Ok(LogWatcher {
|
||||
reader: MuxedLines::new()?,
|
||||
files: HashMap::new(),
|
||||
})
|
||||
}
|
||||
|
||||
fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
|
||||
return self.files.values().filter(|info| info.kind == kind).count();
|
||||
}
|
||||
|
||||
async fn add_file(&mut self, log_file: &'a LogFilePath, kind: LogFileKind) -> io::Result<()> {
|
||||
let lookup_key = self.reader.add_file(&log_file.path).await?;
|
||||
self.files.insert(lookup_key, LogFileInfo { kind, label: &log_file.label });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_watching(&mut self, metrics: &ApacheMetrics) -> io::Result<()> {
|
||||
if self.files.is_empty() {
|
||||
println!("[LogWatcher] No log files provided.");
|
||||
return Err(Error::from(ErrorKind::Unsupported));
|
||||
}
|
||||
|
||||
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
|
||||
|
||||
for metadata in self.files.values() {
|
||||
let label_set = metadata.get_label_set();
|
||||
let _ = metrics.requests_total.get_or_create(&label_set);
|
||||
let _ = metrics.errors_total.get_or_create(&label_set);
|
||||
}
|
||||
|
||||
loop {
|
||||
if let Some(event) = self.reader.next_line().await? {
|
||||
self.handle_line(event, metrics);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_line(&mut self, event: Line, metrics: &ApacheMetrics) {
|
||||
match self.files.get(event.source()) {
|
||||
Some(metadata) => {
|
||||
let label = metadata.label;
|
||||
let (kind, family) = match metadata.kind {
|
||||
LogFileKind::Access => ("access log", &metrics.requests_total),
|
||||
LogFileKind::Error => ("error log", &metrics.errors_total),
|
||||
};
|
||||
|
||||
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, label, event.line());
|
||||
family.get_or_create(&metadata.get_label_set()).inc();
|
||||
}
|
||||
None => {
|
||||
println!("[LogWatcher] Received line from unknown file: {}", event.source().display());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn watch_logs(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> {
|
||||
let mut watcher = LogWatcher::new()?;
|
||||
|
||||
for log_file in &access_log_files {
|
||||
watcher.add_file(log_file, LogFileKind::Access).await?;
|
||||
}
|
||||
|
||||
for log_file in &error_log_files {
|
||||
watcher.add_file(log_file, LogFileKind::Error).await?;
|
||||
}
|
||||
|
||||
watcher.start_watching(&metrics).await?;
|
||||
Ok(())
|
||||
}
|
62
src/logs/filesystem_watcher.rs
Normal file
62
src/logs/filesystem_watcher.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use notify::{ErrorKind, Event, recommended_watcher, RecommendedWatcher, RecursiveMode, Result, Watcher};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub struct FsWatcher {
|
||||
watcher: Mutex<RecommendedWatcher>,
|
||||
}
|
||||
|
||||
impl FsWatcher {
|
||||
pub fn new(callbacks: FsEventCallbacks) -> Result<Self> {
|
||||
let watcher = recommended_watcher(move |event| callbacks.handle_event(event))?;
|
||||
let watcher = Mutex::new(watcher);
|
||||
|
||||
Ok(Self { watcher })
|
||||
}
|
||||
|
||||
pub async fn watch(&self, path: &Path) -> Result<()> {
|
||||
let mut watcher = self.watcher.lock().await;
|
||||
|
||||
if let Err(e) = watcher.unwatch(path) {
|
||||
if !matches!(e.kind, ErrorKind::WatchNotFound) {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
watcher.watch(path, RecursiveMode::NonRecursive)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FsEventCallbacks {
|
||||
senders: HashMap<PathBuf, Sender<Event>>,
|
||||
}
|
||||
|
||||
impl FsEventCallbacks {
|
||||
pub fn new() -> Self {
|
||||
Self { senders: HashMap::new() }
|
||||
}
|
||||
|
||||
pub fn register(&mut self, path: &Path, sender: Sender<Event>) {
|
||||
self.senders.insert(path.to_path_buf(), sender);
|
||||
}
|
||||
|
||||
fn handle_event(&self, event: Result<Event>) {
|
||||
match event {
|
||||
Ok(event) => {
|
||||
for path in &event.paths {
|
||||
if let Some(sender) = self.senders.get(path) {
|
||||
if let Err(e) = sender.try_send(event.clone()) {
|
||||
println!("[FsWatcher] Error sending filesystem event for path \"{}\": {}", path.to_string_lossy(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("[FsWatcher] Error receiving filesystem event: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,9 +1,9 @@
|
||||
use std::{env, io};
|
||||
use std::env::VarError;
|
||||
use std::fs::DirEntry;
|
||||
use std::io;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use path_slash::PathExt;
|
||||
|
||||
/// Reads and parses an environment variable that determines the path and file name pattern of log files.
|
||||
@@ -13,34 +13,22 @@ use path_slash::PathExt;
|
||||
/// 1. A simple path to a file.
|
||||
/// 2. A path with a wildcard anywhere in the file name.
|
||||
/// 3. A path with a standalone wildcard component (i.e. no prefix or suffix in the folder name).
|
||||
pub fn parse_log_file_pattern_from_env(variable_name: &str) -> Result<LogFilePattern, String> {
|
||||
match env::var(variable_name) {
|
||||
Ok(str) => {
|
||||
let pattern_str = Path::new(&str).to_slash().ok_or(format!("Environment variable {} contains an invalid path.", variable_name))?;
|
||||
parse_log_file_pattern_from_str(&pattern_str)
|
||||
}
|
||||
Err(err) => match err {
|
||||
VarError::NotPresent => Err(format!("Environment variable {} must be set.", variable_name)),
|
||||
VarError::NotUnicode(_) => Err(format!("Environment variable {} contains invalid characters.", variable_name))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_log_file_pattern_from_str(pattern_str: &str) -> Result<LogFilePattern, String> {
|
||||
if pattern_str.trim().is_empty() {
|
||||
return Err(String::from("Path is empty."));
|
||||
pub fn parse_log_file_pattern_from_str(pattern: &str) -> Result<LogFilePattern> {
|
||||
let pattern = Path::new(pattern).to_slash().ok_or_else(|| anyhow!("Path is invalid"))?;
|
||||
if pattern.trim().is_empty() {
|
||||
bail!("Path is empty");
|
||||
}
|
||||
|
||||
if let Some((left, right)) = pattern_str.split_once('*') {
|
||||
if let Some((left, right)) = pattern.split_once('*') {
|
||||
parse_log_file_pattern_split_on_wildcard(left, right)
|
||||
} else {
|
||||
Ok(LogFilePattern::WithoutWildcard(pattern_str.to_string()))
|
||||
Ok(LogFilePattern::WithoutWildcard(pattern.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<LogFilePattern, String> {
|
||||
fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<LogFilePattern> {
|
||||
if left.contains('*') || right.contains('*') {
|
||||
return Err(String::from("Path has too many wildcards."));
|
||||
bail!("Path has too many wildcards");
|
||||
}
|
||||
|
||||
if left.ends_with('/') && right.starts_with('/') {
|
||||
@@ -51,7 +39,7 @@ fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<L
|
||||
}
|
||||
|
||||
if right.contains('/') {
|
||||
return Err(String::from("Path has a folder wildcard with a prefix or suffix."));
|
||||
bail!("Path has a folder wildcard with a prefix or suffix");
|
||||
}
|
||||
|
||||
if let Some((folder_path, file_name_prefix)) = left.rsplit_once('/') {
|
||||
@@ -122,10 +110,7 @@ impl LogFilePattern {
|
||||
}
|
||||
|
||||
fn search_without_wildcard(path_str: &String) -> Result<Vec<LogFilePath>, io::Error> {
|
||||
let path = Path::new(path_str);
|
||||
let is_valid = path.is_file() || matches!(path.parent(), Some(parent) if parent.is_dir());
|
||||
|
||||
if is_valid {
|
||||
if Path::new(path_str).is_file() {
|
||||
Ok(vec![LogFilePath::with_empty_label(path_str)])
|
||||
} else {
|
||||
Err(io::Error::from(ErrorKind::NotFound))
|
||||
@@ -178,27 +163,27 @@ impl LogFilePath {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::log_file_pattern::{LogFilePattern, parse_log_file_pattern_from_str};
|
||||
use super::{LogFilePattern, parse_log_file_pattern_from_str};
|
||||
|
||||
#[test]
|
||||
fn empty_path() {
|
||||
assert!(matches!(parse_log_file_pattern_from_str(""), Err(err) if err == "Path is empty."));
|
||||
assert!(matches!(parse_log_file_pattern_from_str(" "), Err(err) if err == "Path is empty."));
|
||||
assert!(matches!(parse_log_file_pattern_from_str(""), Err(err) if err.to_string() == "Path is empty"));
|
||||
assert!(matches!(parse_log_file_pattern_from_str(" "), Err(err) if err.to_string() == "Path is empty"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn too_many_wildcards() {
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/*/to/files/*.log"), Err(err) if err == "Path has too many wildcards."));
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/*/to/files/*.log"), Err(err) if err.to_string() == "Path has too many wildcards"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn folder_wildcard_with_prefix_not_supported() {
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/*abc/to/files/access.log"), Err(err) if err == "Path has a folder wildcard with a prefix or suffix."));
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/*abc/to/files/access.log"), Err(err) if err.to_string() == "Path has a folder wildcard with a prefix or suffix"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn folder_wildcard_with_suffix_not_supported() {
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/abc*/to/files/access.log"), Err(err) if err == "Path has a folder wildcard with a prefix or suffix."));
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/abc*/to/files/access.log"), Err(err) if err.to_string() == "Path has a folder wildcard with a prefix or suffix"));
|
||||
}
|
||||
|
||||
#[test]
|
256
src/logs/log_file_watcher.rs
Normal file
256
src/logs/log_file_watcher.rs
Normal file
@@ -0,0 +1,256 @@
|
||||
use std::cmp::max;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use notify::{Event, EventKind};
|
||||
use notify::event::{CreateKind, ModifyKind};
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncBufReadExt, BufReader, Lines};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
|
||||
use crate::logs::filesystem_watcher::{FsEventCallbacks, FsWatcher};
|
||||
use crate::logs::log_file_pattern::LogFilePath;
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
#[derive(Copy, Clone, PartialEq)]
|
||||
pub enum LogFileKind {
|
||||
Access,
|
||||
Error,
|
||||
}
|
||||
|
||||
struct LogFileMetadata {
|
||||
pub kind: LogFileKind,
|
||||
pub label: String,
|
||||
}
|
||||
|
||||
impl LogFileMetadata {
|
||||
fn get_label_set(&self) -> [(&'static str, String); 1] {
|
||||
[("file", self.label.clone())]
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LogWatcherConfiguration {
|
||||
files: Vec<(PathBuf, LogFileMetadata)>,
|
||||
}
|
||||
|
||||
impl LogWatcherConfiguration {
|
||||
pub fn new() -> LogWatcherConfiguration {
|
||||
LogWatcherConfiguration { files: Vec::new() }
|
||||
}
|
||||
|
||||
fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
|
||||
return self.files.iter().filter(|(_, metadata)| metadata.kind == kind).count();
|
||||
}
|
||||
|
||||
pub fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) {
|
||||
let path = log_file.path;
|
||||
let label = log_file.label;
|
||||
let metadata = LogFileMetadata { kind, label };
|
||||
self.files.push((path, metadata));
|
||||
}
|
||||
|
||||
pub async fn start(self, metrics: &Metrics) -> Result<()> {
|
||||
if self.files.is_empty() {
|
||||
bail!("No log files provided");
|
||||
}
|
||||
|
||||
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
|
||||
|
||||
struct PreparedFile {
|
||||
path: PathBuf,
|
||||
metadata: LogFileMetadata,
|
||||
fs_event_receiver: Receiver<Event>,
|
||||
}
|
||||
|
||||
let mut prepared_files = Vec::new();
|
||||
let mut fs_callbacks = FsEventCallbacks::new();
|
||||
|
||||
for (path, metadata) in self.files {
|
||||
let (fs_event_sender, fs_event_receiver) = mpsc::channel(20);
|
||||
fs_callbacks.register(&path, fs_event_sender);
|
||||
prepared_files.push(PreparedFile { path, metadata, fs_event_receiver });
|
||||
}
|
||||
|
||||
let fs_watcher = FsWatcher::new(fs_callbacks).context("Could not create filesystem watcher")?;
|
||||
|
||||
for file in &prepared_files {
|
||||
let file_path = &file.path;
|
||||
if !file_path.is_absolute() {
|
||||
bail!("Path is not absolute: {}", file_path.to_string_lossy());
|
||||
}
|
||||
|
||||
let parent_path = file_path.parent().ok_or_else(|| anyhow!("Path has no parent: {}", file_path.to_string_lossy()))?;
|
||||
fs_watcher.watch(parent_path).await.with_context(|| format!("Could not create filesystem watcher for directory: {}", parent_path.to_string_lossy()))?;
|
||||
}
|
||||
|
||||
let fs_watcher = Arc::new(fs_watcher);
|
||||
|
||||
for file in prepared_files {
|
||||
let label_set = file.metadata.get_label_set();
|
||||
let _ = metrics.requests_total.get_or_create(&label_set);
|
||||
let _ = metrics.errors_total.get_or_create(&label_set);
|
||||
|
||||
let log_watcher = LogWatcher::create(file.path.clone(), file.metadata, metrics.clone(), Arc::clone(&fs_watcher), file.fs_event_receiver);
|
||||
let log_watcher = log_watcher.await.with_context(|| format!("Could not watch log file: {}", file.path.to_string_lossy()))?;
|
||||
|
||||
tokio::spawn(log_watcher.watch());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct LogWatcher {
|
||||
state: LogWatchingState,
|
||||
processor: LogLineProcessor,
|
||||
fs_event_receiver: Receiver<Event>,
|
||||
}
|
||||
|
||||
impl LogWatcher {
|
||||
async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: Metrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Result<Self> {
|
||||
let state = LogWatchingState::initialize(path.clone(), fs_watcher).await?;
|
||||
let processor = LogLineProcessor { path, metadata, metrics };
|
||||
Ok(LogWatcher { state, processor, fs_event_receiver })
|
||||
}
|
||||
|
||||
async fn watch(mut self) {
|
||||
while let Ok(Some(_)) = self.state.lines.next_line().await {
|
||||
// Skip lines that already existed.
|
||||
}
|
||||
|
||||
let path = &self.processor.path;
|
||||
|
||||
'read_loop:
|
||||
loop {
|
||||
if !self.processor.process_lines(&mut self.state.lines).await {
|
||||
break 'read_loop;
|
||||
}
|
||||
|
||||
'event_loop:
|
||||
loop {
|
||||
let mut next_event = CoalescedFsEvent::None;
|
||||
|
||||
match self.fs_event_receiver.recv().await {
|
||||
None => break 'read_loop,
|
||||
Some(event) => {
|
||||
next_event = next_event.merge(event);
|
||||
|
||||
while let Ok(event) = self.fs_event_receiver.try_recv() {
|
||||
next_event = next_event.merge(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match next_event {
|
||||
CoalescedFsEvent::None => continue 'event_loop,
|
||||
CoalescedFsEvent::NewData => continue 'read_loop,
|
||||
CoalescedFsEvent::NewFile => {
|
||||
println!("[LogWatcher] File recreated: {}", path.to_string_lossy());
|
||||
|
||||
if !self.processor.process_lines(&mut self.state.lines).await {
|
||||
break 'read_loop;
|
||||
}
|
||||
|
||||
self.state = match self.state.reinitialize().await {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
println!("Could not re-watch log file \"{}\": {}", path.to_string_lossy(), e);
|
||||
break 'read_loop;
|
||||
}
|
||||
};
|
||||
|
||||
while let Ok(Some(_)) = self.state.lines.next_line().await {
|
||||
// There are occasional spurious file creation events, so reading
|
||||
// from the beginning would read lines that were already counted.
|
||||
}
|
||||
|
||||
continue 'read_loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("[LogWatcher] Stopping log watcher for: {}", path.to_string_lossy());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
enum CoalescedFsEvent {
|
||||
None = 0,
|
||||
NewData = 1,
|
||||
NewFile = 2,
|
||||
}
|
||||
|
||||
impl CoalescedFsEvent {
|
||||
fn merge(self, event: Event) -> CoalescedFsEvent {
|
||||
match event.kind {
|
||||
EventKind::Modify(ModifyKind::Data(_)) => {
|
||||
max(self, CoalescedFsEvent::NewData)
|
||||
}
|
||||
|
||||
EventKind::Create(CreateKind::File) => {
|
||||
max(self, CoalescedFsEvent::NewFile)
|
||||
}
|
||||
|
||||
_ => self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LogWatchingState {
|
||||
path: PathBuf,
|
||||
lines: Lines<BufReader<File>>,
|
||||
fs_watcher: Arc<FsWatcher>,
|
||||
}
|
||||
|
||||
impl LogWatchingState {
|
||||
const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4;
|
||||
|
||||
async fn initialize(path: PathBuf, fs_watcher: Arc<FsWatcher>) -> Result<LogWatchingState> {
|
||||
fs_watcher.watch(&path).await.context("Could not create filesystem watcher")?;
|
||||
|
||||
let file = File::open(&path).await.context("Could not open file")?;
|
||||
let lines = BufReader::with_capacity(Self::DEFAULT_BUFFER_CAPACITY, file).lines();
|
||||
|
||||
Ok(LogWatchingState { path, lines, fs_watcher })
|
||||
}
|
||||
|
||||
async fn reinitialize(self) -> Result<LogWatchingState> {
|
||||
LogWatchingState::initialize(self.path, self.fs_watcher).await
|
||||
}
|
||||
}
|
||||
|
||||
struct LogLineProcessor {
|
||||
path: PathBuf,
|
||||
metadata: LogFileMetadata,
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
impl LogLineProcessor {
|
||||
async fn process_lines(&self, reader: &mut Lines<BufReader<File>>) -> bool {
|
||||
loop {
|
||||
match reader.next_line().await {
|
||||
Ok(maybe_line) => match maybe_line {
|
||||
Some(line) => self.handle_line(line),
|
||||
None => return true,
|
||||
},
|
||||
Err(e) => {
|
||||
println!("[LogWatcher] Error reading from file \"{}\": {}", self.path.to_string_lossy(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_line(&self, line: String) {
|
||||
let (kind, family) = match self.metadata.kind {
|
||||
LogFileKind::Access => ("access log", &self.metrics.requests_total),
|
||||
LogFileKind::Error => ("error log", &self.metrics.errors_total),
|
||||
};
|
||||
|
||||
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, self.metadata.label, line);
|
||||
family.get_or_create(&self.metadata.get_label_set()).inc();
|
||||
}
|
||||
}
|
48
src/logs/mod.rs
Normal file
48
src/logs/mod.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use std::env;
|
||||
use std::env::VarError;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
|
||||
use log_file_watcher::{LogFileKind, LogWatcherConfiguration};
|
||||
|
||||
use crate::logs::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_str};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
mod access_log_parser;
|
||||
mod filesystem_watcher;
|
||||
mod log_file_pattern;
|
||||
mod log_file_watcher;
|
||||
|
||||
pub fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Result<Vec<LogFilePath>> {
|
||||
let log_file_pattern_str = env::var(environment_variable_name).map_err(|err| match err {
|
||||
VarError::NotPresent => anyhow!("Environment variable {} must be set", environment_variable_name),
|
||||
VarError::NotUnicode(_) => anyhow!("Environment variable {} contains invalid characters", environment_variable_name)
|
||||
})?;
|
||||
|
||||
let log_file_pattern = parse_log_file_pattern_from_str(&log_file_pattern_str).with_context(|| format!("Could not parse pattern: {}", log_file_pattern_str))?;
|
||||
let log_files = log_file_pattern.search().with_context(|| format!("Could not search files: {}", log_file_pattern_str))?;
|
||||
|
||||
if log_files.is_empty() {
|
||||
bail!("No files match pattern: {}", log_file_pattern_str);
|
||||
}
|
||||
|
||||
for log_file in &log_files {
|
||||
println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label);
|
||||
}
|
||||
|
||||
Ok(log_files)
|
||||
}
|
||||
|
||||
pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: Metrics) -> Result<()> {
|
||||
let mut watcher = LogWatcherConfiguration::new();
|
||||
|
||||
for log_file in access_log_files.into_iter() {
|
||||
watcher.add_file(log_file, LogFileKind::Access);
|
||||
}
|
||||
|
||||
for log_file in error_log_files.into_iter() {
|
||||
watcher.add_file(log_file, LogFileKind::Error);
|
||||
}
|
||||
|
||||
watcher.start(&metrics).await
|
||||
}
|
94
src/main.rs
94
src/main.rs
@@ -1,100 +1,38 @@
|
||||
use std::env;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::process::ExitCode;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use tokio::signal;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::apache_metrics::ApacheMetrics;
|
||||
use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env};
|
||||
use crate::log_watcher::watch_logs_task;
|
||||
use crate::web_server::WebServer;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::web::WebServer;
|
||||
|
||||
mod apache_metrics;
|
||||
mod log_file_pattern;
|
||||
mod log_parser;
|
||||
mod log_watcher;
|
||||
mod web_server;
|
||||
mod logs;
|
||||
mod metrics;
|
||||
mod web;
|
||||
|
||||
const ACCESS_LOG_FILE_PATTERN: &str = "ACCESS_LOG_FILE_PATTERN";
|
||||
const ERROR_LOG_FILE_PATTERN: &str = "ERROR_LOG_FILE_PATTERN";
|
||||
|
||||
fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec<LogFilePath>> {
|
||||
let log_file_pattern = match parse_log_file_pattern_from_env(environment_variable_name) {
|
||||
Ok(pattern) => pattern,
|
||||
Err(error) => {
|
||||
println!("Error: {}", error);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let log_files = match log_file_pattern.search() {
|
||||
Ok(files) => files,
|
||||
Err(error) => {
|
||||
println!("Error searching {} files: {}", log_kind, error);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
if log_files.is_empty() {
|
||||
println!("Found no matching {} files.", log_kind);
|
||||
return None;
|
||||
}
|
||||
|
||||
for log_file in &log_files {
|
||||
println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label);
|
||||
}
|
||||
|
||||
Some(log_files)
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> ExitCode {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1"));
|
||||
let bind_ip = match IpAddr::from_str(&host) {
|
||||
Ok(addr) => addr,
|
||||
Err(_) => {
|
||||
println!("Invalid HTTP host: {}", host);
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
};
|
||||
let bind_ip = IpAddr::from_str(&host).map_err(|_| anyhow!("Invalid HTTP host: {}", host))?;
|
||||
|
||||
println!("Initializing exporter...");
|
||||
|
||||
let access_log_files = match find_log_files(ACCESS_LOG_FILE_PATTERN, "access log") {
|
||||
Some(files) => files,
|
||||
None => return ExitCode::FAILURE,
|
||||
};
|
||||
let access_log_files = logs::find_log_files(ACCESS_LOG_FILE_PATTERN, "access log").context("Could not find access log files")?;
|
||||
let error_log_files = logs::find_log_files(ERROR_LOG_FILE_PATTERN, "error log").context("Could not find error log files")?;
|
||||
|
||||
let error_log_files = match find_log_files(ERROR_LOG_FILE_PATTERN, "error log") {
|
||||
Some(files) => files,
|
||||
None => return ExitCode::FAILURE,
|
||||
};
|
||||
let server = WebServer::try_bind(SocketAddr::new(bind_ip, 9240)).context("Could not configure web server")?;
|
||||
let (metrics_registry, metrics) = Metrics::new();
|
||||
|
||||
let server = match WebServer::try_bind(SocketAddr::new(bind_ip, 9240)) {
|
||||
Some(server) => server,
|
||||
None => return ExitCode::FAILURE
|
||||
};
|
||||
|
||||
let (metrics_registry, metrics) = ApacheMetrics::new();
|
||||
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
|
||||
|
||||
tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone()));
|
||||
logs::start_log_watcher(access_log_files, error_log_files, metrics).await.context("Could not start watching logs")?;
|
||||
tokio::spawn(server.serve(Mutex::new(metrics_registry)));
|
||||
|
||||
drop(shutdown_send);
|
||||
|
||||
tokio::select! {
|
||||
_ = signal::ctrl_c() => {
|
||||
println!("Received CTRL-C, shutting down...")
|
||||
}
|
||||
|
||||
_ = shutdown_recv.recv() => {
|
||||
println!("Shutting down...");
|
||||
}
|
||||
}
|
||||
|
||||
ExitCode::SUCCESS
|
||||
signal::ctrl_c().await.with_context(|| "Could not register CTRL-C handler")?;
|
||||
println!("Received CTRL-C, shutting down...");
|
||||
Ok(())
|
||||
}
|
||||
|
@@ -4,20 +4,17 @@ use prometheus_client::registry::Registry;
|
||||
|
||||
type SingleLabel = [(&'static str, String); 1];
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ApacheMetrics {
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Metrics {
|
||||
pub requests_total: Family<SingleLabel, Counter>,
|
||||
pub errors_total: Family<SingleLabel, Counter>
|
||||
}
|
||||
|
||||
impl ApacheMetrics {
|
||||
pub fn new() -> (Registry, ApacheMetrics) {
|
||||
impl Metrics {
|
||||
pub fn new() -> (Registry, Metrics) {
|
||||
let mut registry = <Registry>::default();
|
||||
|
||||
let metrics = ApacheMetrics {
|
||||
requests_total: Family::<SingleLabel, Counter>::default(),
|
||||
errors_total: Family::<SingleLabel, Counter>::default()
|
||||
};
|
||||
let metrics = Metrics::default();
|
||||
|
||||
registry.register("apache_requests", "Number of received requests", metrics.requests_total.clone());
|
||||
registry.register("apache_errors", "Number of logged errors", metrics.errors_total.clone());
|
42
src/web/metrics_endpoint.rs
Normal file
42
src/web/metrics_endpoint.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use hyper::{Body, http, Response, StatusCode};
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use prometheus_client::encoding::text::encode;
|
||||
use prometheus_client::registry::Registry;
|
||||
|
||||
//noinspection SpellCheckingInspection
|
||||
const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text; version=1.0.0; charset=utf-8";
|
||||
|
||||
pub async fn handle(metrics_registry: Arc<Mutex<Registry>>) -> http::Result<Response<Body>> {
|
||||
match try_encode(metrics_registry) {
|
||||
MetricsEncodeResult::Ok(buf) => {
|
||||
Response::builder().status(StatusCode::OK).header(CONTENT_TYPE, METRICS_CONTENT_TYPE).body(Body::from(buf))
|
||||
}
|
||||
MetricsEncodeResult::FailedAcquiringRegistryLock => {
|
||||
println!("[WebServer] Failed acquiring lock on registry.");
|
||||
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
|
||||
}
|
||||
MetricsEncodeResult::FailedEncodingMetrics(e) => {
|
||||
println!("[WebServer] Error encoding metrics: {}", e);
|
||||
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum MetricsEncodeResult {
|
||||
Ok(String),
|
||||
FailedAcquiringRegistryLock,
|
||||
FailedEncodingMetrics(fmt::Error),
|
||||
}
|
||||
|
||||
fn try_encode(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult {
|
||||
let mut buf = String::new();
|
||||
|
||||
return if let Ok(metrics_registry) = metrics_registry.lock() {
|
||||
encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf))
|
||||
} else {
|
||||
MetricsEncodeResult::FailedAcquiringRegistryLock
|
||||
};
|
||||
}
|
57
src/web/mod.rs
Normal file
57
src/web/mod.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use hyper::{Body, Error, Method, Request, Response, Server, StatusCode};
|
||||
use hyper::http::Result;
|
||||
use hyper::server::Builder;
|
||||
use hyper::server::conn::AddrIncoming;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use prometheus_client::registry::Registry;
|
||||
|
||||
mod metrics_endpoint;
|
||||
|
||||
const MAX_BUFFER_SIZE: usize = 1024 * 32;
|
||||
|
||||
pub struct WebServer {
|
||||
builder: Builder<AddrIncoming>,
|
||||
}
|
||||
|
||||
impl WebServer {
|
||||
//noinspection HttpUrlsUsage
|
||||
pub fn try_bind(addr: SocketAddr) -> anyhow::Result<WebServer> {
|
||||
println!("[WebServer] Starting web server on {0} with metrics endpoint: http://{0}/metrics", addr);
|
||||
|
||||
let builder = Server::try_bind(&addr).with_context(|| format!("Could not bind to {}", addr))?;
|
||||
let builder = builder.tcp_keepalive(Some(Duration::from_secs(60)));
|
||||
let builder = builder.http1_only(true);
|
||||
let builder = builder.http1_keepalive(true);
|
||||
let builder = builder.http1_max_buf_size(MAX_BUFFER_SIZE);
|
||||
let builder = builder.http1_header_read_timeout(Duration::from_secs(10));
|
||||
|
||||
Ok(WebServer { builder })
|
||||
}
|
||||
|
||||
pub async fn serve(self, metrics_registry: Mutex<Registry>) {
|
||||
let metrics_registry = Arc::new(metrics_registry);
|
||||
let service = make_service_fn(move |_| {
|
||||
let metrics_registry = Arc::clone(&metrics_registry);
|
||||
async move {
|
||||
Ok::<_, Error>(service_fn(move |req| handle_request(req, Arc::clone(&metrics_registry))))
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = self.builder.serve(service).await {
|
||||
println!("[WebServer] Error starting web server: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_request(req: Request<Body>, metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
|
||||
if req.method() == Method::GET && req.uri().path() == "/metrics" {
|
||||
metrics_endpoint::handle(Arc::clone(&metrics_registry)).await
|
||||
} else {
|
||||
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty())
|
||||
}
|
||||
}
|
@@ -1,95 +0,0 @@
|
||||
use std::fmt;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use hyper::{Body, Error, header, Method, Request, Response, Server, StatusCode};
|
||||
use hyper::http::Result;
|
||||
use hyper::server::Builder;
|
||||
use hyper::server::conn::AddrIncoming;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use prometheus_client::encoding::text::encode;
|
||||
use prometheus_client::registry::Registry;
|
||||
|
||||
const MAX_BUFFER_SIZE: usize = 1024 * 32;
|
||||
|
||||
pub struct WebServer {
|
||||
builder: Builder<AddrIncoming>,
|
||||
}
|
||||
|
||||
impl WebServer {
|
||||
//noinspection HttpUrlsUsage
|
||||
pub fn try_bind(addr: SocketAddr) -> Option<WebServer> {
|
||||
println!("[WebServer] Starting web server on {0} with metrics endpoint: http://{0}/metrics", addr);
|
||||
let builder = match Server::try_bind(&addr) {
|
||||
Ok(builder) => builder,
|
||||
Err(e) => {
|
||||
println!("[WebServer] Could not bind to {}: {}", addr, e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let builder = builder.tcp_keepalive(Some(Duration::from_secs(60)));
|
||||
let builder = builder.http1_only(true);
|
||||
let builder = builder.http1_keepalive(true);
|
||||
let builder = builder.http1_max_buf_size(MAX_BUFFER_SIZE);
|
||||
let builder = builder.http1_header_read_timeout(Duration::from_secs(10));
|
||||
|
||||
Some(WebServer { builder })
|
||||
}
|
||||
|
||||
pub async fn serve(self, metrics_registry: Mutex<Registry>) {
|
||||
let metrics_registry = Arc::new(metrics_registry);
|
||||
let service = make_service_fn(move |_| {
|
||||
let metrics_registry = Arc::clone(&metrics_registry);
|
||||
async move {
|
||||
Ok::<_, Error>(service_fn(move |req| handle_request(req, Arc::clone(&metrics_registry))))
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = self.builder.serve(service).await {
|
||||
println!("[WebServer] Error starting web server: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_request(req: Request<Body>, metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
|
||||
if req.method() == Method::GET && req.uri().path() == "/metrics" {
|
||||
metrics_handler(Arc::clone(&metrics_registry)).await
|
||||
} else {
|
||||
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty())
|
||||
}
|
||||
}
|
||||
|
||||
//noinspection SpellCheckingInspection
|
||||
async fn metrics_handler(metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
|
||||
match encode_metrics(metrics_registry) {
|
||||
MetricsEncodeResult::Ok(buf) => {
|
||||
Response::builder().status(StatusCode::OK).header(header::CONTENT_TYPE, "application/openmetrics-text; version=1.0.0; charset=utf-8").body(Body::from(buf))
|
||||
}
|
||||
MetricsEncodeResult::FailedAcquiringRegistryLock => {
|
||||
println!("[WebServer] Failed acquiring lock on registry.");
|
||||
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
|
||||
}
|
||||
MetricsEncodeResult::FailedEncodingMetrics(e) => {
|
||||
println!("[WebServer] Error encoding metrics: {}", e);
|
||||
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum MetricsEncodeResult {
|
||||
Ok(String),
|
||||
FailedAcquiringRegistryLock,
|
||||
FailedEncodingMetrics(fmt::Error),
|
||||
}
|
||||
|
||||
fn encode_metrics(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult {
|
||||
let mut buf = String::new();
|
||||
|
||||
return if let Ok(metrics_registry) = metrics_registry.lock() {
|
||||
encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf))
|
||||
} else {
|
||||
MetricsEncodeResult::FailedAcquiringRegistryLock
|
||||
};
|
||||
}
|
Reference in New Issue
Block a user