mirror of
https://github.com/chylex/Apache-Prometheus-Exporter.git
synced 2025-09-15 17:32:12 +02:00
Compare commits
7 Commits
alp
...
f0e1447ae5
Author | SHA1 | Date | |
---|---|---|---|
f0e1447ae5
|
|||
ec099185c3
|
|||
7def8921b0
|
|||
383a187358
|
|||
173e4249a6
|
|||
ce6f345b6a
|
|||
8e7259b906
|
1042
Cargo.lock
generated
1042
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
13
Cargo.toml
13
Cargo.toml
@@ -7,11 +7,14 @@ edition = "2021"
|
|||||||
name = "apache_prometheus_exporter"
|
name = "apache_prometheus_exporter"
|
||||||
path = "src/main.rs"
|
path = "src/main.rs"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
[profile.release]
|
||||||
|
strip = true
|
||||||
|
lto = true
|
||||||
|
codegen-units = 1
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-web = "4.1.0"
|
hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] }
|
||||||
linemux = "0.2.4"
|
notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
|
||||||
path-slash = "0.2.1"
|
path-slash = "0.2.1"
|
||||||
prometheus-client = "0.18.0"
|
prometheus-client = "0.21.2"
|
||||||
tokio = { version = "1", features = ["rt", "macros", "signal"] }
|
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "rt", "signal"] }
|
||||||
|
14
README.md
14
README.md
@@ -82,17 +82,21 @@ The wildcard must not include any prefix or suffix, so `/*/` is accepted, but `/
|
|||||||
|
|
||||||
#### Notes
|
#### Notes
|
||||||
|
|
||||||
> At least one access log file and one error log file must be found when the exporter starts, otherwise the exporter immediately exits with an error.
|
> The exporter only searches for files when it starts. If you need the exporter to watch a new file or forget a deleted file, you must restart it.
|
||||||
|
|
||||||
> If a log file is deleted, the exporter will automatically resume watching it if it is re-created later. If you want the exporter to forget about deleted log files, restart the exporter.
|
|
||||||
|
|
||||||
## 4. Launch the Exporter
|
## 4. Launch the Exporter
|
||||||
|
|
||||||
Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL.
|
Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL.
|
||||||
|
|
||||||
Press `Ctrl-C` to stop the exporter.
|
If no errors are shown, the exporter will begin reading the found log files from the end, and printing each line to the standard output. When a log file is rotated, the exporter will begin reading it from the beginning.
|
||||||
|
|
||||||
**Important:** Due to library bugs, the exporter will currently not watch rotated log files. If you want to use this project right now, you will need to add the `-c` flag to `rotatelogs`, and restart the exporter after every rotation.
|
Press `Ctrl-C` to stop the exporter. Signals other than `SIGINT` are ignored.
|
||||||
|
|
||||||
|
#### Notes
|
||||||
|
|
||||||
|
> The exporter is designed to work and tested with the `rotatelogs` tool in a Linux container. Any other tools or operating systems are unsupported.
|
||||||
|
|
||||||
|
> If an error occurs while reading a file or re-opening a rotated file, the exporter will stop watching it and print the error to standard output.
|
||||||
|
|
||||||
## 5. Collect Prometheus Metrics
|
## 5. Collect Prometheus Metrics
|
||||||
|
|
||||||
|
@@ -12,9 +12,9 @@ This configuration will create a Docker volume for the logs, and the following c
|
|||||||
- **User** : `admin`
|
- **User** : `admin`
|
||||||
- **Password** : `admin`
|
- **Password** : `admin`
|
||||||
3. **Prometheus** configured with the exporter's endpoint.
|
3. **Prometheus** configured with the exporter's endpoint.
|
||||||
4. **Exporter** built using the source code from this repository.
|
4. **Exporter** built using the source code from this repository, with its metrics endpoint exposed as: http://localhost:2004/metrics
|
||||||
|
|
||||||
This example is not suitable for production. You can use it as inspiration, but you will have to modify it in order to persist container data and follow the latest security practices:
|
This example is unsuitable for production. You can use it as inspiration, but you will have to modify it in order to persist container data and follow the latest security practices:
|
||||||
|
|
||||||
- Create Docker volumes for persistent storage of container data and configuration files
|
- Create Docker volumes for persistent storage of container data and configuration files
|
||||||
- Create a dedicated user for each container instead of running as `root`
|
- Create a dedicated user for each container instead of running as `root`
|
||||||
|
@@ -38,14 +38,15 @@ services:
|
|||||||
exporter:
|
exporter:
|
||||||
container_name: ape_dev_exporter
|
container_name: ape_dev_exporter
|
||||||
build: "../"
|
build: "../"
|
||||||
expose:
|
ports:
|
||||||
- "9240"
|
- "127.0.0.1:2004:9240"
|
||||||
volumes:
|
volumes:
|
||||||
- logs:/logs
|
- logs:/logs
|
||||||
environment:
|
environment:
|
||||||
HTTP_HOST: "0.0.0.0"
|
HTTP_HOST: "0.0.0.0"
|
||||||
ACCESS_LOG_FILE_PATTERN: "/logs/*.access.log"
|
ACCESS_LOG_FILE_PATTERN: "/logs/*.access.log"
|
||||||
ERROR_LOG_FILE_PATTERN: "/logs/*.error.log"
|
ERROR_LOG_FILE_PATTERN: "/logs/*.error.log"
|
||||||
|
stop_signal: SIGINT
|
||||||
restart: "always"
|
restart: "always"
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
|
@@ -2,7 +2,7 @@ use prometheus_client::metrics::counter::Counter;
|
|||||||
use prometheus_client::metrics::family::Family;
|
use prometheus_client::metrics::family::Family;
|
||||||
use prometheus_client::registry::Registry;
|
use prometheus_client::registry::Registry;
|
||||||
|
|
||||||
type SingleLabel = (&'static str, String);
|
type SingleLabel = [(&'static str, String); 1];
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ApacheMetrics {
|
pub struct ApacheMetrics {
|
||||||
@@ -19,9 +19,9 @@ impl ApacheMetrics {
|
|||||||
errors_total: Family::<SingleLabel, Counter>::default()
|
errors_total: Family::<SingleLabel, Counter>::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
registry.register("apache_requests", "Number of received requests", Box::new(metrics.requests_total.clone()));
|
registry.register("apache_requests", "Number of received requests", metrics.requests_total.clone());
|
||||||
registry.register("apache_errors", "Number of logged errors", Box::new(metrics.errors_total.clone()));
|
registry.register("apache_errors", "Number of logged errors", metrics.errors_total.clone());
|
||||||
|
|
||||||
return (registry, metrics);
|
(registry, metrics)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
62
src/fs_watcher.rs
Normal file
62
src/fs_watcher.rs
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
|
use notify::{ErrorKind, Event, recommended_watcher, RecommendedWatcher, RecursiveMode, Result, Watcher};
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
pub struct FsWatcher {
|
||||||
|
watcher: Mutex<RecommendedWatcher>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FsWatcher {
|
||||||
|
pub fn new(callbacks: FsEventCallbacks) -> Result<Self> {
|
||||||
|
let watcher = recommended_watcher(move |event| callbacks.handle_event(event))?;
|
||||||
|
let watcher = Mutex::new(watcher);
|
||||||
|
|
||||||
|
Ok(Self { watcher })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn watch(&self, path: &Path) -> Result<()> {
|
||||||
|
let mut watcher = self.watcher.lock().await;
|
||||||
|
|
||||||
|
if let Err(e) = watcher.unwatch(path) {
|
||||||
|
if !matches!(e.kind, ErrorKind::WatchNotFound) {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
watcher.watch(path, RecursiveMode::NonRecursive)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FsEventCallbacks {
|
||||||
|
senders: HashMap<PathBuf, Sender<Event>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FsEventCallbacks {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { senders: HashMap::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register(&mut self, path: &Path, sender: Sender<Event>) {
|
||||||
|
self.senders.insert(path.to_path_buf(), sender);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_event(&self, event: Result<Event>) {
|
||||||
|
match event {
|
||||||
|
Ok(event) => {
|
||||||
|
for path in &event.paths {
|
||||||
|
if let Some(sender) = self.senders.get(path) {
|
||||||
|
if let Err(e) = sender.try_send(event.clone()) {
|
||||||
|
println!("[FsWatcher] Error sending filesystem event for path \"{}\": {}", path.to_string_lossy(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("[FsWatcher] Error receiving filesystem event: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -14,7 +14,7 @@ use path_slash::PathExt;
|
|||||||
/// 2. A path with a wildcard anywhere in the file name.
|
/// 2. A path with a wildcard anywhere in the file name.
|
||||||
/// 3. A path with a standalone wildcard component (i.e. no prefix or suffix in the folder name).
|
/// 3. A path with a standalone wildcard component (i.e. no prefix or suffix in the folder name).
|
||||||
pub fn parse_log_file_pattern_from_env(variable_name: &str) -> Result<LogFilePattern, String> {
|
pub fn parse_log_file_pattern_from_env(variable_name: &str) -> Result<LogFilePattern, String> {
|
||||||
return match env::var(variable_name) {
|
match env::var(variable_name) {
|
||||||
Ok(str) => {
|
Ok(str) => {
|
||||||
let pattern_str = Path::new(&str).to_slash().ok_or(format!("Environment variable {} contains an invalid path.", variable_name))?;
|
let pattern_str = Path::new(&str).to_slash().ok_or(format!("Environment variable {} contains an invalid path.", variable_name))?;
|
||||||
parse_log_file_pattern_from_str(&pattern_str)
|
parse_log_file_pattern_from_str(&pattern_str)
|
||||||
@@ -23,7 +23,7 @@ pub fn parse_log_file_pattern_from_env(variable_name: &str) -> Result<LogFilePat
|
|||||||
VarError::NotPresent => Err(format!("Environment variable {} must be set.", variable_name)),
|
VarError::NotPresent => Err(format!("Environment variable {} must be set.", variable_name)),
|
||||||
VarError::NotUnicode(_) => Err(format!("Environment variable {} contains invalid characters.", variable_name))
|
VarError::NotUnicode(_) => Err(format!("Environment variable {} contains invalid characters.", variable_name))
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_log_file_pattern_from_str(pattern_str: &str) -> Result<LogFilePattern, String> {
|
fn parse_log_file_pattern_from_str(pattern_str: &str) -> Result<LogFilePattern, String> {
|
||||||
@@ -31,11 +31,11 @@ fn parse_log_file_pattern_from_str(pattern_str: &str) -> Result<LogFilePattern,
|
|||||||
return Err(String::from("Path is empty."));
|
return Err(String::from("Path is empty."));
|
||||||
}
|
}
|
||||||
|
|
||||||
return if let Some((left, right)) = pattern_str.split_once('*') {
|
if let Some((left, right)) = pattern_str.split_once('*') {
|
||||||
parse_log_file_pattern_split_on_wildcard(left, right)
|
parse_log_file_pattern_split_on_wildcard(left, right)
|
||||||
} else {
|
} else {
|
||||||
Ok(LogFilePattern::WithoutWildcard(pattern_str.to_string()))
|
Ok(LogFilePattern::WithoutWildcard(pattern_str.to_string()))
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<LogFilePattern, String> {
|
fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<LogFilePattern, String> {
|
||||||
@@ -54,7 +54,7 @@ fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<L
|
|||||||
return Err(String::from("Path has a folder wildcard with a prefix or suffix."));
|
return Err(String::from("Path has a folder wildcard with a prefix or suffix."));
|
||||||
}
|
}
|
||||||
|
|
||||||
return if let Some((folder_path, file_name_prefix)) = left.rsplit_once('/') {
|
if let Some((folder_path, file_name_prefix)) = left.rsplit_once('/') {
|
||||||
Ok(LogFilePattern::WithFileNameWildcard(PatternWithFileNameWildcard {
|
Ok(LogFilePattern::WithFileNameWildcard(PatternWithFileNameWildcard {
|
||||||
path: folder_path.to_string(),
|
path: folder_path.to_string(),
|
||||||
file_name_prefix: file_name_prefix.to_string(),
|
file_name_prefix: file_name_prefix.to_string(),
|
||||||
@@ -66,7 +66,7 @@ fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<L
|
|||||||
file_name_prefix: left.to_string(),
|
file_name_prefix: left.to_string(),
|
||||||
file_name_suffix: right.to_string(),
|
file_name_suffix: right.to_string(),
|
||||||
}))
|
}))
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -82,11 +82,10 @@ impl PatternWithFileNameWildcard {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn match_wildcard_on_dir_entry(&self, dir_entry: &DirEntry) -> Option<String> {
|
fn match_wildcard_on_dir_entry(&self, dir_entry: &DirEntry) -> Option<String> {
|
||||||
return if let Some(wildcard_match) = dir_entry.file_name().to_str().and_then(|file_name| self.match_wildcard(file_name)) {
|
dir_entry.file_name()
|
||||||
Some(wildcard_match.to_string())
|
.to_str()
|
||||||
} else {
|
.and_then(|file_name| self.match_wildcard(file_name))
|
||||||
None
|
.map(|wildcard_match| wildcard_match.to_string())
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,22 +114,22 @@ pub enum LogFilePattern {
|
|||||||
|
|
||||||
impl LogFilePattern {
|
impl LogFilePattern {
|
||||||
pub fn search(&self) -> Result<Vec<LogFilePath>, io::Error> { // TODO error message
|
pub fn search(&self) -> Result<Vec<LogFilePath>, io::Error> { // TODO error message
|
||||||
return match self {
|
match self {
|
||||||
Self::WithoutWildcard(path) => Self::search_without_wildcard(path),
|
Self::WithoutWildcard(path) => Self::search_without_wildcard(path),
|
||||||
Self::WithFileNameWildcard(pattern) => Self::search_with_file_name_wildcard(pattern),
|
Self::WithFileNameWildcard(pattern) => Self::search_with_file_name_wildcard(pattern),
|
||||||
Self::WithFolderNameWildcard(pattern) => Self::search_with_folder_name_wildcard(pattern)
|
Self::WithFolderNameWildcard(pattern) => Self::search_with_folder_name_wildcard(pattern)
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn search_without_wildcard(path_str: &String) -> Result<Vec<LogFilePath>, io::Error> {
|
fn search_without_wildcard(path_str: &String) -> Result<Vec<LogFilePath>, io::Error> {
|
||||||
let path = Path::new(path_str);
|
let path = Path::new(path_str);
|
||||||
let is_valid = path.is_file() || matches!(path.parent(), Some(parent) if parent.is_dir());
|
let is_valid = path.is_file() || matches!(path.parent(), Some(parent) if parent.is_dir());
|
||||||
|
|
||||||
return if is_valid {
|
if is_valid {
|
||||||
Ok(vec![LogFilePath::with_empty_label(path_str)])
|
Ok(vec![LogFilePath::with_empty_label(path_str)])
|
||||||
} else {
|
} else {
|
||||||
Err(io::Error::from(ErrorKind::NotFound))
|
Err(io::Error::from(ErrorKind::NotFound))
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn search_with_file_name_wildcard(pattern: &PatternWithFileNameWildcard) -> Result<Vec<LogFilePath>, io::Error> {
|
fn search_with_file_name_wildcard(pattern: &PatternWithFileNameWildcard) -> Result<Vec<LogFilePath>, io::Error> {
|
||||||
@@ -143,7 +142,7 @@ impl LogFilePattern {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return Ok(result);
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn search_with_folder_name_wildcard(pattern: &PatternWithFolderNameWildcard) -> Result<Vec<LogFilePath>, io::Error> {
|
fn search_with_folder_name_wildcard(pattern: &PatternWithFolderNameWildcard) -> Result<Vec<LogFilePath>, io::Error> {
|
||||||
@@ -159,7 +158,7 @@ impl LogFilePattern {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return Ok(result);
|
Ok(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -170,10 +169,10 @@ pub struct LogFilePath {
|
|||||||
|
|
||||||
impl LogFilePath {
|
impl LogFilePath {
|
||||||
fn with_empty_label(s: &String) -> LogFilePath {
|
fn with_empty_label(s: &String) -> LogFilePath {
|
||||||
return LogFilePath {
|
LogFilePath {
|
||||||
path: PathBuf::from(s),
|
path: PathBuf::from(s),
|
||||||
label: String::default(),
|
label: String::default(),
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -209,12 +208,12 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn valid_with_file_name_wildcard_prefix() {
|
fn valid_with_file_name_wildcard_prefix() {
|
||||||
assert!(matches!(parse_log_file_pattern_from_str("/path/to/files/access_*"), Ok(LogFilePattern::WithFileNameWildcard(pattern)) if pattern.path == "/path/to/files" && pattern.file_name_prefix == "access_" && pattern.file_name_suffix == ""));
|
assert!(matches!(parse_log_file_pattern_from_str("/path/to/files/access_*"), Ok(LogFilePattern::WithFileNameWildcard(pattern)) if pattern.path == "/path/to/files" && pattern.file_name_prefix == "access_" && pattern.file_name_suffix.is_empty()));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn valid_with_file_name_wildcard_suffix() {
|
fn valid_with_file_name_wildcard_suffix() {
|
||||||
assert!(matches!(parse_log_file_pattern_from_str("/path/to/files/*_access.log"), Ok(LogFilePattern::WithFileNameWildcard(pattern)) if pattern.path == "/path/to/files" && pattern.file_name_prefix == "" && pattern.file_name_suffix == "_access.log"));
|
assert!(matches!(parse_log_file_pattern_from_str("/path/to/files/*_access.log"), Ok(LogFilePattern::WithFileNameWildcard(pattern)) if pattern.path == "/path/to/files" && pattern.file_name_prefix.is_empty() && pattern.file_name_suffix == "_access.log"));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@@ -1,12 +1,16 @@
|
|||||||
use std::collections::HashMap;
|
use std::cmp::max;
|
||||||
use std::io;
|
|
||||||
use std::io::{Error, ErrorKind};
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use linemux::{Line, MuxedLines};
|
use notify::{Event, EventKind};
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use notify::event::{CreateKind, ModifyKind};
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio::io::{AsyncBufReadExt, BufReader, Lines};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::sync::mpsc::Receiver;
|
||||||
|
|
||||||
use crate::ApacheMetrics;
|
use crate::ApacheMetrics;
|
||||||
|
use crate::fs_watcher::{FsEventCallbacks, FsWatcher};
|
||||||
use crate::log_file_pattern::LogFilePath;
|
use crate::log_file_pattern::LogFilePath;
|
||||||
|
|
||||||
#[derive(Copy, Clone, PartialEq)]
|
#[derive(Copy, Clone, PartialEq)]
|
||||||
@@ -15,98 +19,274 @@ enum LogFileKind {
|
|||||||
Error,
|
Error,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct LogFileInfo<'a> {
|
struct LogFileMetadata {
|
||||||
pub kind: LogFileKind,
|
pub kind: LogFileKind,
|
||||||
pub label: &'a String,
|
pub label: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> LogFileInfo<'a> {
|
impl LogFileMetadata {
|
||||||
fn get_label_set(&self) -> (&'static str, String) {
|
fn get_label_set(&self) -> [(&'static str, String); 1] {
|
||||||
return ("file", self.label.clone());
|
[("file", self.label.clone())]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn watch_logs_task(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) {
|
pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> bool {
|
||||||
if let Err(error) = watch_logs(access_log_files, error_log_files, metrics).await {
|
let mut watcher = LogWatcherConfiguration::new();
|
||||||
println!("[LogWatcher] Error reading logs: {}", error);
|
|
||||||
shutdown_send.send(()).unwrap();
|
for log_file in access_log_files.into_iter() {
|
||||||
|
watcher.add_file(log_file, LogFileKind::Access);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for log_file in error_log_files.into_iter() {
|
||||||
|
watcher.add_file(log_file, LogFileKind::Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
watcher.start(&metrics).await
|
||||||
}
|
}
|
||||||
|
|
||||||
struct LogWatcher<'a> {
|
struct LogWatcherConfiguration {
|
||||||
reader: MuxedLines,
|
files: Vec<(PathBuf, LogFileMetadata)>,
|
||||||
files: HashMap<PathBuf, LogFileInfo<'a>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> LogWatcher<'a> {
|
impl LogWatcherConfiguration {
|
||||||
fn new() -> io::Result<LogWatcher<'a>> {
|
fn new() -> LogWatcherConfiguration {
|
||||||
return Ok(LogWatcher {
|
LogWatcherConfiguration { files: Vec::new() }
|
||||||
reader: MuxedLines::new()?,
|
|
||||||
files: HashMap::new(),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
|
fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
|
||||||
return self.files.values().filter(|info| info.kind == kind).count();
|
return self.files.iter().filter(|(_, metadata)| metadata.kind == kind).count();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn add_file(&mut self, log_file: &'a LogFilePath, kind: LogFileKind) -> io::Result<()> {
|
fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) {
|
||||||
let lookup_key = self.reader.add_file(&log_file.path).await?;
|
let path = log_file.path;
|
||||||
self.files.insert(lookup_key, LogFileInfo { kind, label: &log_file.label });
|
let label = log_file.label;
|
||||||
Ok(())
|
let metadata = LogFileMetadata { kind, label };
|
||||||
|
self.files.push((path, metadata));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_watching(&mut self, metrics: &ApacheMetrics) -> io::Result<()> {
|
async fn start(self, metrics: &ApacheMetrics) -> bool {
|
||||||
if self.files.is_empty() {
|
if self.files.is_empty() {
|
||||||
println!("[LogWatcher] No log files provided.");
|
println!("[LogWatcher] No log files provided.");
|
||||||
return Err(Error::from(ErrorKind::Unsupported));
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
|
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
|
||||||
|
|
||||||
for metadata in self.files.values() {
|
struct PreparedFile {
|
||||||
let label_set = metadata.get_label_set();
|
path: PathBuf,
|
||||||
|
metadata: LogFileMetadata,
|
||||||
|
fs_event_receiver: Receiver<Event>,
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut prepared_files = Vec::new();
|
||||||
|
let mut fs_callbacks = FsEventCallbacks::new();
|
||||||
|
|
||||||
|
for (path, metadata) in self.files {
|
||||||
|
let (fs_event_sender, fs_event_receiver) = mpsc::channel(20);
|
||||||
|
fs_callbacks.register(&path, fs_event_sender);
|
||||||
|
prepared_files.push(PreparedFile { path, metadata, fs_event_receiver });
|
||||||
|
}
|
||||||
|
|
||||||
|
let fs_watcher = match FsWatcher::new(fs_callbacks) {
|
||||||
|
Ok(fs_watcher) => fs_watcher,
|
||||||
|
Err(e) => {
|
||||||
|
println!("[LogWatcher] Error creating filesystem watcher: {}", e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for file in &prepared_files {
|
||||||
|
let file_path = &file.path;
|
||||||
|
if !file_path.is_absolute() {
|
||||||
|
println!("[LogWatcher] Error creating filesystem watcher, path is not absolute: {}", file_path.to_string_lossy());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let parent_path = if let Some(parent) = file_path.parent() {
|
||||||
|
parent
|
||||||
|
} else {
|
||||||
|
println!("[LogWatcher] Error creating filesystem watcher for parent directory of file \"{}\", parent directory does not exist", file_path.to_string_lossy());
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = fs_watcher.watch(parent_path).await {
|
||||||
|
println!("[LogWatcher] Error creating filesystem watcher for directory \"{}\": {}", parent_path.to_string_lossy(), e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let fs_watcher = Arc::new(fs_watcher);
|
||||||
|
|
||||||
|
for file in prepared_files {
|
||||||
|
let label_set = file.metadata.get_label_set();
|
||||||
let _ = metrics.requests_total.get_or_create(&label_set);
|
let _ = metrics.requests_total.get_or_create(&label_set);
|
||||||
let _ = metrics.errors_total.get_or_create(&label_set);
|
let _ = metrics.errors_total.get_or_create(&label_set);
|
||||||
|
|
||||||
|
let log_watcher = match LogWatcher::create(file.path, file.metadata, metrics.clone(), Arc::clone(&fs_watcher), file.fs_event_receiver).await {
|
||||||
|
Some(log_watcher) => log_watcher,
|
||||||
|
None => return false,
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::spawn(log_watcher.watch());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct LogWatcher {
|
||||||
|
state: LogWatchingState,
|
||||||
|
processor: LogLineProcessor,
|
||||||
|
fs_event_receiver: Receiver<Event>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LogWatcher {
|
||||||
|
async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: ApacheMetrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Option<Self> {
|
||||||
|
let state = match LogWatchingState::initialize(path.clone(), fs_watcher).await {
|
||||||
|
Some(state) => state,
|
||||||
|
None => return None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let processor = LogLineProcessor { path, metadata, metrics };
|
||||||
|
Some(LogWatcher { state, processor, fs_event_receiver })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn watch(mut self) {
|
||||||
|
while let Ok(Some(_)) = self.state.lines.next_line().await {
|
||||||
|
// Skip lines that already existed.
|
||||||
|
}
|
||||||
|
|
||||||
|
let path = &self.processor.path;
|
||||||
|
|
||||||
|
'read_loop:
|
||||||
loop {
|
loop {
|
||||||
if let Some(event) = self.reader.next_line().await? {
|
if !self.processor.process_lines(&mut self.state.lines).await {
|
||||||
self.handle_line(event, metrics);
|
break 'read_loop;
|
||||||
|
}
|
||||||
|
|
||||||
|
'event_loop:
|
||||||
|
loop {
|
||||||
|
let mut next_event = CoalescedFsEvent::None;
|
||||||
|
|
||||||
|
match self.fs_event_receiver.recv().await {
|
||||||
|
None => break 'read_loop,
|
||||||
|
Some(event) => {
|
||||||
|
next_event = next_event.merge(event);
|
||||||
|
|
||||||
|
while let Ok(event) = self.fs_event_receiver.try_recv() {
|
||||||
|
next_event = next_event.merge(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match next_event {
|
||||||
|
CoalescedFsEvent::None => continue 'event_loop,
|
||||||
|
CoalescedFsEvent::NewData => continue 'read_loop,
|
||||||
|
CoalescedFsEvent::NewFile => {
|
||||||
|
println!("[LogWatcher] File recreated: {}", path.to_string_lossy());
|
||||||
|
|
||||||
|
if !self.processor.process_lines(&mut self.state.lines).await {
|
||||||
|
break 'read_loop;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.state = match self.state.reinitialize().await {
|
||||||
|
Some(state) => state,
|
||||||
|
None => break 'read_loop,
|
||||||
|
};
|
||||||
|
|
||||||
|
continue 'read_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
println!("[LogWatcher] Stopping log watcher for: {}", path.to_string_lossy());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_line(&mut self, event: Line, metrics: &ApacheMetrics) {
|
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||||
match self.files.get(event.source()) {
|
enum CoalescedFsEvent {
|
||||||
Some(metadata) => {
|
None = 0,
|
||||||
let label = metadata.label;
|
NewData = 1,
|
||||||
let (kind, family) = match metadata.kind {
|
NewFile = 2,
|
||||||
LogFileKind::Access => ("access log", &metrics.requests_total),
|
}
|
||||||
LogFileKind::Error => ("error log", &metrics.errors_total),
|
|
||||||
};
|
|
||||||
|
|
||||||
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, label, event.line());
|
impl CoalescedFsEvent {
|
||||||
family.get_or_create(&metadata.get_label_set()).inc();
|
fn merge(self, event: Event) -> CoalescedFsEvent {
|
||||||
|
match event.kind {
|
||||||
|
EventKind::Modify(ModifyKind::Data(_)) => {
|
||||||
|
max(self, CoalescedFsEvent::NewData)
|
||||||
}
|
}
|
||||||
None => {
|
|
||||||
println!("[LogWatcher] Received line from unknown file: {}", event.source().display());
|
EventKind::Create(CreateKind::File) => {
|
||||||
|
max(self, CoalescedFsEvent::NewFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ => self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn watch_logs(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> {
|
struct LogWatchingState {
|
||||||
let mut watcher = LogWatcher::new()?;
|
path: PathBuf,
|
||||||
|
lines: Lines<BufReader<File>>,
|
||||||
for log_file in &access_log_files {
|
fs_watcher: Arc<FsWatcher>,
|
||||||
watcher.add_file(log_file, LogFileKind::Access).await?;
|
}
|
||||||
}
|
|
||||||
|
impl LogWatchingState {
|
||||||
for log_file in &error_log_files {
|
const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4;
|
||||||
watcher.add_file(log_file, LogFileKind::Error).await?;
|
|
||||||
}
|
async fn initialize(path: PathBuf, fs_watcher: Arc<FsWatcher>) -> Option<LogWatchingState> {
|
||||||
|
if let Err(e) = fs_watcher.watch(&path).await {
|
||||||
watcher.start_watching(&metrics).await?;
|
println!("[LogWatcher] Error creating filesystem watcher for file \"{}\": {}", path.to_string_lossy(), e);
|
||||||
Ok(())
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let lines = match File::open(&path).await {
|
||||||
|
Ok(file) => BufReader::with_capacity(Self::DEFAULT_BUFFER_CAPACITY, file).lines(),
|
||||||
|
Err(e) => {
|
||||||
|
println!("[LogWatcher] Error opening file \"{}\": {}", path.to_string_lossy(), e);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(LogWatchingState { path, lines, fs_watcher })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn reinitialize(self) -> Option<LogWatchingState> {
|
||||||
|
LogWatchingState::initialize(self.path, self.fs_watcher).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct LogLineProcessor {
|
||||||
|
path: PathBuf,
|
||||||
|
metadata: LogFileMetadata,
|
||||||
|
metrics: ApacheMetrics,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LogLineProcessor {
|
||||||
|
async fn process_lines(&self, reader: &mut Lines<BufReader<File>>) -> bool {
|
||||||
|
loop {
|
||||||
|
match reader.next_line().await {
|
||||||
|
Ok(maybe_line) => match maybe_line {
|
||||||
|
Some(line) => self.handle_line(line),
|
||||||
|
None => return true,
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
println!("[LogWatcher] Error reading from file \"{}\": {}", self.path.to_string_lossy(), e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_line(&self, line: String) {
|
||||||
|
let (kind, family) = match self.metadata.kind {
|
||||||
|
LogFileKind::Access => ("access log", &self.metrics.requests_total),
|
||||||
|
LogFileKind::Error => ("error log", &self.metrics.errors_total),
|
||||||
|
};
|
||||||
|
|
||||||
|
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, self.metadata.label, line);
|
||||||
|
family.get_or_create(&self.metadata.get_label_set()).inc();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
55
src/main.rs
55
src/main.rs
@@ -1,23 +1,25 @@
|
|||||||
use std::env;
|
use std::env;
|
||||||
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::process::ExitCode;
|
use std::process::ExitCode;
|
||||||
|
use std::str::FromStr;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
|
||||||
use tokio::signal;
|
use tokio::signal;
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
use crate::apache_metrics::ApacheMetrics;
|
use crate::apache_metrics::ApacheMetrics;
|
||||||
use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env};
|
use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env};
|
||||||
use crate::log_watcher::watch_logs_task;
|
use crate::log_watcher::start_log_watcher;
|
||||||
use crate::web_server::{create_web_server, run_web_server};
|
use crate::web_server::WebServer;
|
||||||
|
|
||||||
mod apache_metrics;
|
mod apache_metrics;
|
||||||
|
mod fs_watcher;
|
||||||
mod log_file_pattern;
|
mod log_file_pattern;
|
||||||
mod log_parser;
|
mod log_parser;
|
||||||
mod log_watcher;
|
mod log_watcher;
|
||||||
mod web_server;
|
mod web_server;
|
||||||
|
|
||||||
const ACCESS_LOG_FILE_PATTERN: &'static str = "ACCESS_LOG_FILE_PATTERN";
|
const ACCESS_LOG_FILE_PATTERN: &str = "ACCESS_LOG_FILE_PATTERN";
|
||||||
const ERROR_LOG_FILE_PATTERN: &'static str = "ERROR_LOG_FILE_PATTERN";
|
const ERROR_LOG_FILE_PATTERN: &str = "ERROR_LOG_FILE_PATTERN";
|
||||||
|
|
||||||
fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec<LogFilePath>> {
|
fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec<LogFilePath>> {
|
||||||
let log_file_pattern = match parse_log_file_pattern_from_env(environment_variable_name) {
|
let log_file_pattern = match parse_log_file_pattern_from_env(environment_variable_name) {
|
||||||
@@ -45,12 +47,19 @@ fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec
|
|||||||
println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label);
|
println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Some(log_files);
|
Some(log_files)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main(flavor = "current_thread")]
|
#[tokio::main(flavor = "current_thread")]
|
||||||
async fn main() -> ExitCode {
|
async fn main() -> ExitCode {
|
||||||
let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1"));
|
let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1"));
|
||||||
|
let bind_ip = match IpAddr::from_str(&host) {
|
||||||
|
Ok(addr) => addr,
|
||||||
|
Err(_) => {
|
||||||
|
println!("Invalid HTTP host: {}", host);
|
||||||
|
return ExitCode::FAILURE;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
println!("Initializing exporter...");
|
println!("Initializing exporter...");
|
||||||
|
|
||||||
@@ -64,23 +73,27 @@ async fn main() -> ExitCode {
|
|||||||
None => return ExitCode::FAILURE,
|
None => return ExitCode::FAILURE,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let server = match WebServer::try_bind(SocketAddr::new(bind_ip, 9240)) {
|
||||||
|
Some(server) => server,
|
||||||
|
None => return ExitCode::FAILURE
|
||||||
|
};
|
||||||
|
|
||||||
let (metrics_registry, metrics) = ApacheMetrics::new();
|
let (metrics_registry, metrics) = ApacheMetrics::new();
|
||||||
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone()));
|
if !start_log_watcher(access_log_files, error_log_files, metrics).await {
|
||||||
tokio::spawn(run_web_server(create_web_server(host.as_str(), 9240, Mutex::new(metrics_registry))));
|
return ExitCode::FAILURE;
|
||||||
|
|
||||||
drop(shutdown_send);
|
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
_ = signal::ctrl_c() => {
|
|
||||||
println!("Received CTRL-C, shutting down...")
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = shutdown_recv.recv() => {
|
|
||||||
println!("Shutting down...");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ExitCode::SUCCESS
|
tokio::spawn(server.serve(Mutex::new(metrics_registry)));
|
||||||
|
|
||||||
|
match signal::ctrl_c().await {
|
||||||
|
Ok(_) => {
|
||||||
|
println!("Received CTRL-C, shutting down...");
|
||||||
|
ExitCode::SUCCESS
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
println!("Error registering CTRL-C handler: {}", e);
|
||||||
|
ExitCode::FAILURE
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,54 +1,95 @@
|
|||||||
use std::str;
|
use std::fmt;
|
||||||
use std::sync::Mutex;
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use actix_web::{App, HttpResponse, HttpServer, Result, web};
|
use hyper::{Body, Error, header, Method, Request, Response, Server, StatusCode};
|
||||||
use actix_web::dev::Server;
|
use hyper::http::Result;
|
||||||
|
use hyper::server::Builder;
|
||||||
|
use hyper::server::conn::AddrIncoming;
|
||||||
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
use prometheus_client::encoding::text::encode;
|
use prometheus_client::encoding::text::encode;
|
||||||
use prometheus_client::registry::Registry;
|
use prometheus_client::registry::Registry;
|
||||||
|
|
||||||
//noinspection HttpUrlsUsage
|
const MAX_BUFFER_SIZE: usize = 1024 * 32;
|
||||||
pub fn create_web_server(host: &str, port: u16, metrics_registry: Mutex<Registry>) -> Server {
|
|
||||||
let metrics_registry = web::Data::new(metrics_registry);
|
|
||||||
|
|
||||||
let server = HttpServer::new(move || {
|
pub struct WebServer {
|
||||||
App::new()
|
builder: Builder<AddrIncoming>,
|
||||||
.app_data(metrics_registry.clone())
|
|
||||||
.service(web::resource("/metrics").route(web::get().to(metrics_handler)))
|
|
||||||
});
|
|
||||||
|
|
||||||
let server = server.keep_alive(Duration::from_secs(60));
|
|
||||||
let server = server.shutdown_timeout(0);
|
|
||||||
let server = server.disable_signals();
|
|
||||||
let server = server.workers(1);
|
|
||||||
let server = server.bind((host, port));
|
|
||||||
|
|
||||||
println!("[WebServer] Starting web server on {0}:{1} with metrics endpoint: http://{0}:{1}/metrics", host, port);
|
|
||||||
return server.unwrap().run();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_web_server(server: Server) {
|
impl WebServer {
|
||||||
if let Err(e) = server.await {
|
//noinspection HttpUrlsUsage
|
||||||
println!("[WebServer] Error running web server: {}", e);
|
pub fn try_bind(addr: SocketAddr) -> Option<WebServer> {
|
||||||
|
println!("[WebServer] Starting web server on {0} with metrics endpoint: http://{0}/metrics", addr);
|
||||||
|
let builder = match Server::try_bind(&addr) {
|
||||||
|
Ok(builder) => builder,
|
||||||
|
Err(e) => {
|
||||||
|
println!("[WebServer] Could not bind to {}: {}", addr, e);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let builder = builder.tcp_keepalive(Some(Duration::from_secs(60)));
|
||||||
|
let builder = builder.http1_only(true);
|
||||||
|
let builder = builder.http1_keepalive(true);
|
||||||
|
let builder = builder.http1_max_buf_size(MAX_BUFFER_SIZE);
|
||||||
|
let builder = builder.http1_header_read_timeout(Duration::from_secs(10));
|
||||||
|
|
||||||
|
Some(WebServer { builder })
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
async fn metrics_handler(metrics_registry: web::Data<Mutex<Registry>>) -> Result<HttpResponse> {
|
pub async fn serve(self, metrics_registry: Mutex<Registry>) {
|
||||||
let mut buf = Vec::new();
|
let metrics_registry = Arc::new(metrics_registry);
|
||||||
|
let service = make_service_fn(move |_| {
|
||||||
|
let metrics_registry = Arc::clone(&metrics_registry);
|
||||||
|
async move {
|
||||||
|
Ok::<_, Error>(service_fn(move |req| handle_request(req, Arc::clone(&metrics_registry))))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
{
|
if let Err(e) = self.builder.serve(service).await {
|
||||||
if let Ok(metrics_registry) = metrics_registry.lock() {
|
println!("[WebServer] Error starting web server: {}", e);
|
||||||
encode(&mut buf, &metrics_registry)?;
|
|
||||||
} else {
|
|
||||||
println!("[WebServer] Failed acquiring lock on registry.");
|
|
||||||
return Ok(HttpResponse::InternalServerError().body(""));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if let Ok(buf) = String::from_utf8(buf) {
|
async fn handle_request(req: Request<Body>, metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
|
||||||
Ok(HttpResponse::Ok().content_type("application/openmetrics-text; version=1.0.0; charset=utf-8").body(buf))
|
if req.method() == Method::GET && req.uri().path() == "/metrics" {
|
||||||
|
metrics_handler(Arc::clone(&metrics_registry)).await
|
||||||
} else {
|
} else {
|
||||||
println!("[WebServer] Failed converting buffer to UTF-8.");
|
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty())
|
||||||
Ok(HttpResponse::InternalServerError().body(""))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//noinspection SpellCheckingInspection
|
||||||
|
async fn metrics_handler(metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
|
||||||
|
match encode_metrics(metrics_registry) {
|
||||||
|
MetricsEncodeResult::Ok(buf) => {
|
||||||
|
Response::builder().status(StatusCode::OK).header(header::CONTENT_TYPE, "application/openmetrics-text; version=1.0.0; charset=utf-8").body(Body::from(buf))
|
||||||
|
}
|
||||||
|
MetricsEncodeResult::FailedAcquiringRegistryLock => {
|
||||||
|
println!("[WebServer] Failed acquiring lock on registry.");
|
||||||
|
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
|
||||||
|
}
|
||||||
|
MetricsEncodeResult::FailedEncodingMetrics(e) => {
|
||||||
|
println!("[WebServer] Error encoding metrics: {}", e);
|
||||||
|
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum MetricsEncodeResult {
|
||||||
|
Ok(String),
|
||||||
|
FailedAcquiringRegistryLock,
|
||||||
|
FailedEncodingMetrics(fmt::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encode_metrics(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult {
|
||||||
|
let mut buf = String::new();
|
||||||
|
|
||||||
|
return if let Ok(metrics_registry) = metrics_registry.lock() {
|
||||||
|
encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf))
|
||||||
|
} else {
|
||||||
|
MetricsEncodeResult::FailedAcquiringRegistryLock
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user