mirror of
https://github.com/chylex/Apache-Prometheus-Exporter.git
synced 2025-09-15 17:32:12 +02:00
Compare commits
17 Commits
8d3337c74f
...
main
Author | SHA1 | Date | |
---|---|---|---|
c2f47cf736
|
|||
f942b9830d
|
|||
974f7f4035
|
|||
bbc416b8d3
|
|||
f0e1447ae5
|
|||
ec099185c3
|
|||
7def8921b0
|
|||
383a187358
|
|||
173e4249a6
|
|||
ce6f345b6a
|
|||
8e7259b906
|
|||
54120e1b33
|
|||
723fd0b323
|
|||
3b3bf887f0
|
|||
e4fc38538d
|
|||
9d1059153d
|
|||
ae1046b6a5
|
1045
Cargo.lock
generated
1045
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
16
Cargo.toml
16
Cargo.toml
@@ -1,17 +1,21 @@
|
||||
[package]
|
||||
name = "apache_prometheus_exporter"
|
||||
version = "0.1.0"
|
||||
version = "1.0.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
name = "apache_prometheus_exporter"
|
||||
path = "src/main.rs"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
[profile.release]
|
||||
strip = true
|
||||
lto = true
|
||||
codegen-units = 1
|
||||
|
||||
[dependencies]
|
||||
actix-web = "4.1.0"
|
||||
linemux = "0.2.4"
|
||||
anyhow = "1.0.75"
|
||||
hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] }
|
||||
notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
|
||||
path-slash = "0.2.1"
|
||||
prometheus-client = "0.18.0"
|
||||
tokio = { version = "1", features = ["rt", "macros", "signal"] }
|
||||
prometheus-client = "0.21.2"
|
||||
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "rt", "signal"] }
|
||||
|
108
README.md
Normal file
108
README.md
Normal file
@@ -0,0 +1,108 @@
|
||||
# Apache Prometheus Exporter
|
||||
|
||||
Exports Prometheus metrics from Apache access logs.
|
||||
|
||||
See the [docker](./docker) folder for an example setup using Docker Compose.
|
||||
|
||||
## 1. Configure Apache Access Log Format
|
||||
|
||||
The following snippet will create a log format named `prometheus` that includes all information the exporter expects. See [Apache documentation](https://httpd.apache.org/docs/2.4/mod/mod_log_config.html#formats) for explanation of the format.
|
||||
|
||||
```apache
|
||||
LogFormat "%t %h \"%r\" %>s %O %{ms}T \"%{Referer}i\" \"%{User-Agent}i\"" prometheus
|
||||
```
|
||||
|
||||
## 2. Configure Apache Virtual Hosts
|
||||
|
||||
The following snippet is an example of how you could configure Apache to serve 3 domains from different folders using macros.
|
||||
|
||||
Each domain has its own access and error log file. The log files are rotated daily, with a dedicated folder for each day, and a `${APACHE_LOG_DIR}/latest/` folder with hard links to today's log files - this folder will be watched by the exporter.
|
||||
|
||||
```apache
|
||||
<Macro Logs $domain>
|
||||
ErrorLog "|/usr/bin/rotatelogs -l -f -D -L ${APACHE_LOG_DIR}/latest/$domain.error.log ${APACHE_LOG_DIR}/%Y-%m-%d/$domain.error.log 86400"
|
||||
CustomLog "|/usr/bin/rotatelogs -l -f -D -L ${APACHE_LOG_DIR}/latest/$domain.access.log ${APACHE_LOG_DIR}/%Y-%m-%d/$domain.access.log 86400" prometheus
|
||||
</Macro>
|
||||
|
||||
<Macro Domain $domain>
|
||||
<VirtualHost *:80>
|
||||
ServerName $domain
|
||||
DocumentRoot /var/www/html/$domain
|
||||
Use Logs $domain
|
||||
</VirtualHost>
|
||||
</Macro>
|
||||
|
||||
Domain first.example.com
|
||||
Domain second.example.com
|
||||
Domain third.example.com
|
||||
|
||||
UndefMacro Domain
|
||||
UndefMacro Logs
|
||||
```
|
||||
|
||||
In this example, the `first.example.com` domain will be served from `/var/www/html/first.example.com`, and its logs will be written to:
|
||||
- `${APACHE_LOG_DIR}/latest/first.example.com.access.log`
|
||||
- `${APACHE_LOG_DIR}/latest/first.example.com.error.log`
|
||||
|
||||
## 3. Configure the Exporter
|
||||
|
||||
The exporter requires the following environment variables:
|
||||
|
||||
### `HTTP_HOST`
|
||||
|
||||
The host that the HTTP server for metrics will listen on. If omitted, defaults to `127.0.0.1`.
|
||||
|
||||
### `ACCESS_LOG_FILE_PATTERN`, `ERROR_LOG_FILE_PATTERN`
|
||||
|
||||
The path to the access/error log files. You may use a single wildcard to match multiple files in a folder, or to match multiple folders in one level of the path. Whatever is matched by the wildcard will become the Prometheus label `file`. If there is no wildcard, the `file` label will be empty.
|
||||
|
||||
#### Example 1 (File Name Wildcard)
|
||||
|
||||
Log files for all domains are in `/var/log/apache2/latest/` and are named `<domain>.access.log` and `<domain>.error.log`. This is the set up from the Apache configuration example above.
|
||||
|
||||
**Pattern:** `/var/log/apache2/latest/*.access.log`
|
||||
|
||||
- Metrics for `/var/log/apache2/latest/first.example.com.access.log` will be labeled: `first.example.com`
|
||||
- Metrics for `/var/log/apache2/latest/first.example.com.error.log` will be labeled: `first.example.com`
|
||||
- Metrics for `/var/log/apache2/latest/second.example.com.access.log` will be labeled: `second.example.com`
|
||||
|
||||
The wildcard may appear anywhere in the file name.
|
||||
|
||||
#### Example 2 (Folder Wildcard)
|
||||
|
||||
Every domain has its own folder in `/var/log/apache2/latest/` containing log files named `access.log` and `error.log`.
|
||||
|
||||
**Pattern:** `/var/log/apache2/latest/*/access.log`
|
||||
|
||||
- Metrics for `/var/log/apache2/latest/first.example.com/access.log` will be labeled: `first.example.com`
|
||||
- Metrics for `/var/log/apache2/latest/first.example.com/error.log` will be labeled: `first.example.com`
|
||||
- Metrics for `/var/log/apache2/latest/second.example.com/access.log` will be labeled: `second.example.com`
|
||||
|
||||
The wildcard must not include any prefix or suffix, so `/*/` is accepted, but `/prefix_*/` or `/*_suffix/` is not.
|
||||
|
||||
#### Notes
|
||||
|
||||
> The exporter only searches for files when it starts. If you need the exporter to watch a new file or forget a deleted file, you must restart it.
|
||||
|
||||
## 4. Launch the Exporter
|
||||
|
||||
Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL.
|
||||
|
||||
If no errors are shown, the exporter will begin reading the found log files from the end, and printing each line to the standard output. When a log file is rotated, the exporter will begin reading it from the beginning.
|
||||
|
||||
Press `Ctrl-C` to stop the exporter. Signals other than `SIGINT` are ignored.
|
||||
|
||||
#### Notes
|
||||
|
||||
> The exporter is designed to work and tested with the `rotatelogs` tool in a Linux container. Any other tools or operating systems are unsupported.
|
||||
|
||||
> If an error occurs while reading a file or re-opening a rotated file, the exporter will stop watching it and print the error to standard output.
|
||||
|
||||
## 5. Collect Prometheus Metrics
|
||||
|
||||
Currently, the exporter exposes only these metrics:
|
||||
|
||||
- `apache_requests_total` total number of requests
|
||||
- `apache_errors_total` total number of errors
|
||||
|
||||
More detailed metrics will be added in the future.
|
@@ -12,9 +12,9 @@ This configuration will create a Docker volume for the logs, and the following c
|
||||
- **User** : `admin`
|
||||
- **Password** : `admin`
|
||||
3. **Prometheus** configured with the exporter's endpoint.
|
||||
4. **Exporter** built using the source code from this repository.
|
||||
4. **Exporter** built using the source code from this repository, with its metrics endpoint exposed as: http://localhost:2004/metrics
|
||||
|
||||
This example is not suitable for production. You can use it as inspiration, but you will have to modify it in order to persist container data and follow the latest security practices:
|
||||
This example is unsuitable for production. You can use it as inspiration, but you will have to modify it in order to persist container data and follow the latest security practices:
|
||||
|
||||
- Create Docker volumes for persistent storage of container data and configuration files
|
||||
- Create a dedicated user for each container instead of running as `root`
|
||||
|
@@ -38,13 +38,15 @@ services:
|
||||
exporter:
|
||||
container_name: ape_dev_exporter
|
||||
build: "../"
|
||||
expose:
|
||||
- "9240"
|
||||
ports:
|
||||
- "127.0.0.1:2004:9240"
|
||||
volumes:
|
||||
- logs:/logs
|
||||
environment:
|
||||
HTTP_HOST: "0.0.0.0"
|
||||
LOG_FILE_PATTERN: "/logs/*.access.log"
|
||||
ACCESS_LOG_FILE_PATTERN: "/logs/*.access.log"
|
||||
ERROR_LOG_FILE_PATTERN: "/logs/*.error.log"
|
||||
stop_signal: SIGINT
|
||||
restart: "always"
|
||||
|
||||
volumes:
|
||||
|
@@ -1,23 +0,0 @@
|
||||
use prometheus_client::metrics::counter::Counter;
|
||||
use prometheus_client::metrics::family::Family;
|
||||
use prometheus_client::registry::Registry;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ApacheMetrics {
|
||||
pub requests_total: Family<(&'static str, String), Counter>
|
||||
}
|
||||
|
||||
impl ApacheMetrics {
|
||||
pub fn new() -> (Registry, ApacheMetrics) {
|
||||
let mut registry = <Registry>::default();
|
||||
|
||||
let requests_total = Family::<(&'static str, String), Counter>::default();
|
||||
registry.register("apache_requests", "Number of received requests", Box::new(requests_total.clone()));
|
||||
|
||||
let metrics = ApacheMetrics {
|
||||
requests_total
|
||||
};
|
||||
|
||||
return (registry, metrics);
|
||||
}
|
||||
}
|
@@ -1,49 +0,0 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use linemux::MuxedLines;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
use crate::ApacheMetrics;
|
||||
use crate::log_file_pattern::LogFilePath;
|
||||
|
||||
pub async fn read_logs_task(log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) {
|
||||
if let Err(error) = read_logs(log_files, metrics).await {
|
||||
println!("[LogWatcher] Error reading logs: {}", error);
|
||||
shutdown_send.send(()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_logs(log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> {
|
||||
let mut file_reader = MuxedLines::new()?;
|
||||
let mut label_lookup: HashMap<PathBuf, &String> = HashMap::new();
|
||||
|
||||
for log_file in &log_files {
|
||||
let lookup_key = file_reader.add_file(&log_file.path).await?;
|
||||
label_lookup.insert(lookup_key, &log_file.label);
|
||||
}
|
||||
|
||||
if log_files.is_empty() {
|
||||
println!("[LogWatcher] No log files provided.");
|
||||
return Err(Error::from(ErrorKind::Unsupported));
|
||||
}
|
||||
|
||||
println!("[LogWatcher] Watching {} log file(s).", log_files.len());
|
||||
|
||||
loop {
|
||||
let event_result = file_reader.next_line().await?;
|
||||
if let Some(event) = event_result {
|
||||
match label_lookup.get(event.source()) {
|
||||
Some(&label) => {
|
||||
println!("[LogWatcher] Received line from \"{}\": {}", label, event.line());
|
||||
metrics.requests_total.get_or_create(&("file", label.clone())).inc();
|
||||
}
|
||||
None => {
|
||||
println!("[LogWatcher] Received line from unknown file: {}", event.source().display());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
58
src/logs/access_log_parser.rs
Normal file
58
src/logs/access_log_parser.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
use std::fmt::{Display, Error, Formatter};
|
||||
|
||||
pub struct AccessLogLineParts<'a> {
|
||||
pub time: &'a str,
|
||||
pub remote_host: &'a str,
|
||||
pub request: &'a str,
|
||||
pub response_status: &'a str,
|
||||
pub response_bytes: &'a str,
|
||||
pub response_time_ms: &'a str,
|
||||
pub referer: &'a str,
|
||||
pub user_agent: &'a str,
|
||||
}
|
||||
|
||||
impl Display for AccessLogLineParts<'_> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
|
||||
write!(f, "[{}] {} \"{}\" {} {} {} \"{}\" \"{}\"", self.time, self.remote_host, self.request, self.response_status, self.response_bytes, self.response_time_ms, self.referer, self.user_agent)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> AccessLogLineParts<'a> {
|
||||
pub fn parse(line: &'a str) -> Result<AccessLogLineParts<'a>, ParseError> {
|
||||
let (time, line) = extract_between_chars(line, '[', ']').ok_or(ParseError::TimeBracketsNotFound)?;
|
||||
let (remote_host, line) = next_space_delimited_part(line).ok_or(ParseError::RemoteHostNotFound)?;
|
||||
let (request, line) = extract_between_chars(line.trim_start_matches(' '), '"', '"').ok_or(ParseError::RequestNotFound)?;
|
||||
let (response_status, line) = next_space_delimited_part(line).ok_or(ParseError::ResponseStatusNotFound)?;
|
||||
let (response_bytes, line) = next_space_delimited_part(line).ok_or(ParseError::ResponseBytesNotFound)?;
|
||||
let (response_time_ms, line) = next_space_delimited_part(line).ok_or(ParseError::ResponseTimeNotFound)?;
|
||||
let (referer, line) = extract_between_chars(line.trim_start_matches(' '), '"', '"').ok_or(ParseError::RefererNotFound)?;
|
||||
let (user_agent, _) = extract_between_chars(line.trim_start_matches(' '), '"', '"').ok_or(ParseError::UserAgentNotFound)?;
|
||||
Ok(AccessLogLineParts { time, remote_host, request, response_status, response_bytes, response_time_ms, referer, user_agent })
|
||||
}
|
||||
}
|
||||
|
||||
fn next_space_delimited_part(str: &str) -> Option<(&str, &str)> {
|
||||
return str.trim_start_matches(' ').split_once(' ')
|
||||
}
|
||||
|
||||
fn extract_between_chars(str: &str, left_side: char, right_side: char) -> Option<(&str, &str)> {
|
||||
let str = str.trim_start_matches(' ');
|
||||
let next_char = str.chars().next()?;
|
||||
return if next_char == left_side {
|
||||
str.get(1..)?.split_once(right_side)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum ParseError {
|
||||
TimeBracketsNotFound,
|
||||
RemoteHostNotFound,
|
||||
RequestNotFound,
|
||||
ResponseStatusNotFound,
|
||||
ResponseBytesNotFound,
|
||||
ResponseTimeNotFound,
|
||||
RefererNotFound,
|
||||
UserAgentNotFound,
|
||||
}
|
62
src/logs/filesystem_watcher.rs
Normal file
62
src/logs/filesystem_watcher.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use notify::{ErrorKind, Event, recommended_watcher, RecommendedWatcher, RecursiveMode, Result, Watcher};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub struct FsWatcher {
|
||||
watcher: Mutex<RecommendedWatcher>,
|
||||
}
|
||||
|
||||
impl FsWatcher {
|
||||
pub fn new(callbacks: FsEventCallbacks) -> Result<Self> {
|
||||
let watcher = recommended_watcher(move |event| callbacks.handle_event(event))?;
|
||||
let watcher = Mutex::new(watcher);
|
||||
|
||||
Ok(Self { watcher })
|
||||
}
|
||||
|
||||
pub async fn watch(&self, path: &Path) -> Result<()> {
|
||||
let mut watcher = self.watcher.lock().await;
|
||||
|
||||
if let Err(e) = watcher.unwatch(path) {
|
||||
if !matches!(e.kind, ErrorKind::WatchNotFound) {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
watcher.watch(path, RecursiveMode::NonRecursive)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FsEventCallbacks {
|
||||
senders: HashMap<PathBuf, Sender<Event>>,
|
||||
}
|
||||
|
||||
impl FsEventCallbacks {
|
||||
pub fn new() -> Self {
|
||||
Self { senders: HashMap::new() }
|
||||
}
|
||||
|
||||
pub fn register(&mut self, path: &Path, sender: Sender<Event>) {
|
||||
self.senders.insert(path.to_path_buf(), sender);
|
||||
}
|
||||
|
||||
fn handle_event(&self, event: Result<Event>) {
|
||||
match event {
|
||||
Ok(event) => {
|
||||
for path in &event.paths {
|
||||
if let Some(sender) = self.senders.get(path) {
|
||||
if let Err(e) = sender.try_send(event.clone()) {
|
||||
println!("[FsWatcher] Error sending filesystem event for path \"{}\": {}", path.to_string_lossy(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("[FsWatcher] Error receiving filesystem event: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,48 +1,34 @@
|
||||
use std::{env, io};
|
||||
use std::env::VarError;
|
||||
use std::fs::DirEntry;
|
||||
use std::io;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use path_slash::PathExt;
|
||||
|
||||
/// Environment variable that determines the path and file name pattern of log files.
|
||||
/// Reads and parses an environment variable that determines the path and file name pattern of log files.
|
||||
///
|
||||
/// Supports 3 pattern types:
|
||||
///
|
||||
/// 1. A simple path to a file.
|
||||
/// 2. A path with a wildcard anywhere in the file name.
|
||||
/// 3. A path with a standalone wildcard component (i.e. no prefix or suffix in the folder name).
|
||||
pub const LOG_FILE_PATTERN: &'static str = "LOG_FILE_PATTERN";
|
||||
|
||||
pub fn parse_log_file_pattern_from_env() -> Result<LogFilePattern, String> {
|
||||
return match env::var(LOG_FILE_PATTERN) {
|
||||
Ok(str) => {
|
||||
let pattern_str = Path::new(&str).to_slash().ok_or(format!("Environment variable {} contains an invalid path.", LOG_FILE_PATTERN))?;
|
||||
parse_log_file_pattern_from_str(&pattern_str)
|
||||
}
|
||||
Err(err) => match err {
|
||||
VarError::NotPresent => Err(format!("Environment variable {} must be set.", LOG_FILE_PATTERN)),
|
||||
VarError::NotUnicode(_) => Err(format!("Environment variable {} contains invalid characters.", LOG_FILE_PATTERN))
|
||||
}
|
||||
};
|
||||
pub fn parse_log_file_pattern_from_str(pattern: &str) -> Result<LogFilePattern> {
|
||||
let pattern = Path::new(pattern).to_slash().ok_or_else(|| anyhow!("Path is invalid"))?;
|
||||
if pattern.trim().is_empty() {
|
||||
bail!("Path is empty");
|
||||
}
|
||||
|
||||
fn parse_log_file_pattern_from_str(pattern_str: &str) -> Result<LogFilePattern, String> {
|
||||
if pattern_str.trim().is_empty() {
|
||||
return Err(String::from("Path is empty."));
|
||||
}
|
||||
|
||||
return if let Some((left, right)) = pattern_str.split_once('*') {
|
||||
if let Some((left, right)) = pattern.split_once('*') {
|
||||
parse_log_file_pattern_split_on_wildcard(left, right)
|
||||
} else {
|
||||
Ok(LogFilePattern::WithoutWildcard(pattern_str.to_string()))
|
||||
};
|
||||
Ok(LogFilePattern::WithoutWildcard(pattern.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<LogFilePattern, String> {
|
||||
fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<LogFilePattern> {
|
||||
if left.contains('*') || right.contains('*') {
|
||||
return Err(String::from("Path has too many wildcards."));
|
||||
bail!("Path has too many wildcards");
|
||||
}
|
||||
|
||||
if left.ends_with('/') && right.starts_with('/') {
|
||||
@@ -53,10 +39,10 @@ fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<L
|
||||
}
|
||||
|
||||
if right.contains('/') {
|
||||
return Err(String::from("Path has a folder wildcard with a prefix or suffix."));
|
||||
bail!("Path has a folder wildcard with a prefix or suffix");
|
||||
}
|
||||
|
||||
return if let Some((folder_path, file_name_prefix)) = left.rsplit_once('/') {
|
||||
if let Some((folder_path, file_name_prefix)) = left.rsplit_once('/') {
|
||||
Ok(LogFilePattern::WithFileNameWildcard(PatternWithFileNameWildcard {
|
||||
path: folder_path.to_string(),
|
||||
file_name_prefix: file_name_prefix.to_string(),
|
||||
@@ -68,7 +54,7 @@ fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<L
|
||||
file_name_prefix: left.to_string(),
|
||||
file_name_suffix: right.to_string(),
|
||||
}))
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -84,11 +70,10 @@ impl PatternWithFileNameWildcard {
|
||||
}
|
||||
|
||||
fn match_wildcard_on_dir_entry(&self, dir_entry: &DirEntry) -> Option<String> {
|
||||
return if let Some(wildcard_match) = dir_entry.file_name().to_str().and_then(|file_name| self.match_wildcard(file_name)) {
|
||||
Some(wildcard_match.to_string())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
dir_entry.file_name()
|
||||
.to_str()
|
||||
.and_then(|file_name| self.match_wildcard(file_name))
|
||||
.map(|wildcard_match| wildcard_match.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,22 +102,19 @@ pub enum LogFilePattern {
|
||||
|
||||
impl LogFilePattern {
|
||||
pub fn search(&self) -> Result<Vec<LogFilePath>, io::Error> { // TODO error message
|
||||
return match self {
|
||||
match self {
|
||||
Self::WithoutWildcard(path) => Self::search_without_wildcard(path),
|
||||
Self::WithFileNameWildcard(pattern) => Self::search_with_file_name_wildcard(pattern),
|
||||
Self::WithFolderNameWildcard(pattern) => Self::search_with_folder_name_wildcard(pattern)
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn search_without_wildcard(path_str: &String) -> Result<Vec<LogFilePath>, io::Error> {
|
||||
let path = Path::new(path_str);
|
||||
let is_valid = path.is_file() || matches!(path.parent(), Some(parent) if parent.is_dir());
|
||||
|
||||
return if is_valid {
|
||||
if Path::new(path_str).is_file() {
|
||||
Ok(vec![LogFilePath::with_empty_label(path_str)])
|
||||
} else {
|
||||
Err(io::Error::from(ErrorKind::NotFound))
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn search_with_file_name_wildcard(pattern: &PatternWithFileNameWildcard) -> Result<Vec<LogFilePath>, io::Error> {
|
||||
@@ -145,7 +127,7 @@ impl LogFilePattern {
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(result);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn search_with_folder_name_wildcard(pattern: &PatternWithFolderNameWildcard) -> Result<Vec<LogFilePath>, io::Error> {
|
||||
@@ -161,7 +143,7 @@ impl LogFilePattern {
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(result);
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,36 +154,36 @@ pub struct LogFilePath {
|
||||
|
||||
impl LogFilePath {
|
||||
fn with_empty_label(s: &String) -> LogFilePath {
|
||||
return LogFilePath {
|
||||
LogFilePath {
|
||||
path: PathBuf::from(s),
|
||||
label: String::default(),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::log_file_pattern::{LogFilePattern, parse_log_file_pattern_from_str};
|
||||
use super::{LogFilePattern, parse_log_file_pattern_from_str};
|
||||
|
||||
#[test]
|
||||
fn empty_path() {
|
||||
assert!(matches!(parse_log_file_pattern_from_str(""), Err(err) if err == "Path is empty."));
|
||||
assert!(matches!(parse_log_file_pattern_from_str(" "), Err(err) if err == "Path is empty."));
|
||||
assert!(matches!(parse_log_file_pattern_from_str(""), Err(err) if err.to_string() == "Path is empty"));
|
||||
assert!(matches!(parse_log_file_pattern_from_str(" "), Err(err) if err.to_string() == "Path is empty"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn too_many_wildcards() {
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/*/to/files/*.log"), Err(err) if err == "Path has too many wildcards."));
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/*/to/files/*.log"), Err(err) if err.to_string() == "Path has too many wildcards"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn folder_wildcard_with_prefix_not_supported() {
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/*abc/to/files/access.log"), Err(err) if err == "Path has a folder wildcard with a prefix or suffix."));
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/*abc/to/files/access.log"), Err(err) if err.to_string() == "Path has a folder wildcard with a prefix or suffix"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn folder_wildcard_with_suffix_not_supported() {
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/abc*/to/files/access.log"), Err(err) if err == "Path has a folder wildcard with a prefix or suffix."));
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/abc*/to/files/access.log"), Err(err) if err.to_string() == "Path has a folder wildcard with a prefix or suffix"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -211,12 +193,12 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn valid_with_file_name_wildcard_prefix() {
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/to/files/access_*"), Ok(LogFilePattern::WithFileNameWildcard(pattern)) if pattern.path == "/path/to/files" && pattern.file_name_prefix == "access_" && pattern.file_name_suffix == ""));
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/to/files/access_*"), Ok(LogFilePattern::WithFileNameWildcard(pattern)) if pattern.path == "/path/to/files" && pattern.file_name_prefix == "access_" && pattern.file_name_suffix.is_empty()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn valid_with_file_name_wildcard_suffix() {
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/to/files/*_access.log"), Ok(LogFilePattern::WithFileNameWildcard(pattern)) if pattern.path == "/path/to/files" && pattern.file_name_prefix == "" && pattern.file_name_suffix == "_access.log"));
|
||||
assert!(matches!(parse_log_file_pattern_from_str("/path/to/files/*_access.log"), Ok(LogFilePattern::WithFileNameWildcard(pattern)) if pattern.path == "/path/to/files" && pattern.file_name_prefix.is_empty() && pattern.file_name_suffix == "_access.log"));
|
||||
}
|
||||
|
||||
#[test]
|
256
src/logs/log_file_watcher.rs
Normal file
256
src/logs/log_file_watcher.rs
Normal file
@@ -0,0 +1,256 @@
|
||||
use std::cmp::max;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use notify::{Event, EventKind};
|
||||
use notify::event::{CreateKind, ModifyKind};
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncBufReadExt, BufReader, Lines};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
|
||||
use crate::logs::filesystem_watcher::{FsEventCallbacks, FsWatcher};
|
||||
use crate::logs::log_file_pattern::LogFilePath;
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
#[derive(Copy, Clone, PartialEq)]
|
||||
pub enum LogFileKind {
|
||||
Access,
|
||||
Error,
|
||||
}
|
||||
|
||||
struct LogFileMetadata {
|
||||
pub kind: LogFileKind,
|
||||
pub label: String,
|
||||
}
|
||||
|
||||
impl LogFileMetadata {
|
||||
fn get_label_set(&self) -> [(&'static str, String); 1] {
|
||||
[("file", self.label.clone())]
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LogWatcherConfiguration {
|
||||
files: Vec<(PathBuf, LogFileMetadata)>,
|
||||
}
|
||||
|
||||
impl LogWatcherConfiguration {
|
||||
pub fn new() -> LogWatcherConfiguration {
|
||||
LogWatcherConfiguration { files: Vec::new() }
|
||||
}
|
||||
|
||||
fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
|
||||
return self.files.iter().filter(|(_, metadata)| metadata.kind == kind).count();
|
||||
}
|
||||
|
||||
pub fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) {
|
||||
let path = log_file.path;
|
||||
let label = log_file.label;
|
||||
let metadata = LogFileMetadata { kind, label };
|
||||
self.files.push((path, metadata));
|
||||
}
|
||||
|
||||
pub async fn start(self, metrics: &Metrics) -> Result<()> {
|
||||
if self.files.is_empty() {
|
||||
bail!("No log files provided");
|
||||
}
|
||||
|
||||
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
|
||||
|
||||
struct PreparedFile {
|
||||
path: PathBuf,
|
||||
metadata: LogFileMetadata,
|
||||
fs_event_receiver: Receiver<Event>,
|
||||
}
|
||||
|
||||
let mut prepared_files = Vec::new();
|
||||
let mut fs_callbacks = FsEventCallbacks::new();
|
||||
|
||||
for (path, metadata) in self.files {
|
||||
let (fs_event_sender, fs_event_receiver) = mpsc::channel(20);
|
||||
fs_callbacks.register(&path, fs_event_sender);
|
||||
prepared_files.push(PreparedFile { path, metadata, fs_event_receiver });
|
||||
}
|
||||
|
||||
let fs_watcher = FsWatcher::new(fs_callbacks).context("Could not create filesystem watcher")?;
|
||||
|
||||
for file in &prepared_files {
|
||||
let file_path = &file.path;
|
||||
if !file_path.is_absolute() {
|
||||
bail!("Path is not absolute: {}", file_path.to_string_lossy());
|
||||
}
|
||||
|
||||
let parent_path = file_path.parent().ok_or_else(|| anyhow!("Path has no parent: {}", file_path.to_string_lossy()))?;
|
||||
fs_watcher.watch(parent_path).await.with_context(|| format!("Could not create filesystem watcher for directory: {}", parent_path.to_string_lossy()))?;
|
||||
}
|
||||
|
||||
let fs_watcher = Arc::new(fs_watcher);
|
||||
|
||||
for file in prepared_files {
|
||||
let label_set = file.metadata.get_label_set();
|
||||
let _ = metrics.requests_total.get_or_create(&label_set);
|
||||
let _ = metrics.errors_total.get_or_create(&label_set);
|
||||
|
||||
let log_watcher = LogWatcher::create(file.path.clone(), file.metadata, metrics.clone(), Arc::clone(&fs_watcher), file.fs_event_receiver);
|
||||
let log_watcher = log_watcher.await.with_context(|| format!("Could not watch log file: {}", file.path.to_string_lossy()))?;
|
||||
|
||||
tokio::spawn(log_watcher.watch());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct LogWatcher {
|
||||
state: LogWatchingState,
|
||||
processor: LogLineProcessor,
|
||||
fs_event_receiver: Receiver<Event>,
|
||||
}
|
||||
|
||||
impl LogWatcher {
|
||||
async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: Metrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Result<Self> {
|
||||
let state = LogWatchingState::initialize(path.clone(), fs_watcher).await?;
|
||||
let processor = LogLineProcessor { path, metadata, metrics };
|
||||
Ok(LogWatcher { state, processor, fs_event_receiver })
|
||||
}
|
||||
|
||||
async fn watch(mut self) {
|
||||
while let Ok(Some(_)) = self.state.lines.next_line().await {
|
||||
// Skip lines that already existed.
|
||||
}
|
||||
|
||||
let path = &self.processor.path;
|
||||
|
||||
'read_loop:
|
||||
loop {
|
||||
if !self.processor.process_lines(&mut self.state.lines).await {
|
||||
break 'read_loop;
|
||||
}
|
||||
|
||||
'event_loop:
|
||||
loop {
|
||||
let mut next_event = CoalescedFsEvent::None;
|
||||
|
||||
match self.fs_event_receiver.recv().await {
|
||||
None => break 'read_loop,
|
||||
Some(event) => {
|
||||
next_event = next_event.merge(event);
|
||||
|
||||
while let Ok(event) = self.fs_event_receiver.try_recv() {
|
||||
next_event = next_event.merge(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match next_event {
|
||||
CoalescedFsEvent::None => continue 'event_loop,
|
||||
CoalescedFsEvent::NewData => continue 'read_loop,
|
||||
CoalescedFsEvent::NewFile => {
|
||||
println!("[LogWatcher] File recreated: {}", path.to_string_lossy());
|
||||
|
||||
if !self.processor.process_lines(&mut self.state.lines).await {
|
||||
break 'read_loop;
|
||||
}
|
||||
|
||||
self.state = match self.state.reinitialize().await {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
println!("Could not re-watch log file \"{}\": {}", path.to_string_lossy(), e);
|
||||
break 'read_loop;
|
||||
}
|
||||
};
|
||||
|
||||
while let Ok(Some(_)) = self.state.lines.next_line().await {
|
||||
// There are occasional spurious file creation events, so reading
|
||||
// from the beginning would read lines that were already counted.
|
||||
}
|
||||
|
||||
continue 'read_loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("[LogWatcher] Stopping log watcher for: {}", path.to_string_lossy());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
enum CoalescedFsEvent {
|
||||
None = 0,
|
||||
NewData = 1,
|
||||
NewFile = 2,
|
||||
}
|
||||
|
||||
impl CoalescedFsEvent {
|
||||
fn merge(self, event: Event) -> CoalescedFsEvent {
|
||||
match event.kind {
|
||||
EventKind::Modify(ModifyKind::Data(_)) => {
|
||||
max(self, CoalescedFsEvent::NewData)
|
||||
}
|
||||
|
||||
EventKind::Create(CreateKind::File) => {
|
||||
max(self, CoalescedFsEvent::NewFile)
|
||||
}
|
||||
|
||||
_ => self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LogWatchingState {
|
||||
path: PathBuf,
|
||||
lines: Lines<BufReader<File>>,
|
||||
fs_watcher: Arc<FsWatcher>,
|
||||
}
|
||||
|
||||
impl LogWatchingState {
|
||||
const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4;
|
||||
|
||||
async fn initialize(path: PathBuf, fs_watcher: Arc<FsWatcher>) -> Result<LogWatchingState> {
|
||||
fs_watcher.watch(&path).await.context("Could not create filesystem watcher")?;
|
||||
|
||||
let file = File::open(&path).await.context("Could not open file")?;
|
||||
let lines = BufReader::with_capacity(Self::DEFAULT_BUFFER_CAPACITY, file).lines();
|
||||
|
||||
Ok(LogWatchingState { path, lines, fs_watcher })
|
||||
}
|
||||
|
||||
async fn reinitialize(self) -> Result<LogWatchingState> {
|
||||
LogWatchingState::initialize(self.path, self.fs_watcher).await
|
||||
}
|
||||
}
|
||||
|
||||
struct LogLineProcessor {
|
||||
path: PathBuf,
|
||||
metadata: LogFileMetadata,
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
impl LogLineProcessor {
|
||||
async fn process_lines(&self, reader: &mut Lines<BufReader<File>>) -> bool {
|
||||
loop {
|
||||
match reader.next_line().await {
|
||||
Ok(maybe_line) => match maybe_line {
|
||||
Some(line) => self.handle_line(line),
|
||||
None => return true,
|
||||
},
|
||||
Err(e) => {
|
||||
println!("[LogWatcher] Error reading from file \"{}\": {}", self.path.to_string_lossy(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_line(&self, line: String) {
|
||||
let (kind, family) = match self.metadata.kind {
|
||||
LogFileKind::Access => ("access log", &self.metrics.requests_total),
|
||||
LogFileKind::Error => ("error log", &self.metrics.errors_total),
|
||||
};
|
||||
|
||||
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, self.metadata.label, line);
|
||||
family.get_or_create(&self.metadata.get_label_set()).inc();
|
||||
}
|
||||
}
|
48
src/logs/mod.rs
Normal file
48
src/logs/mod.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use std::env;
|
||||
use std::env::VarError;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
|
||||
use log_file_watcher::{LogFileKind, LogWatcherConfiguration};
|
||||
|
||||
use crate::logs::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_str};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
mod access_log_parser;
|
||||
mod filesystem_watcher;
|
||||
mod log_file_pattern;
|
||||
mod log_file_watcher;
|
||||
|
||||
pub fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Result<Vec<LogFilePath>> {
|
||||
let log_file_pattern_str = env::var(environment_variable_name).map_err(|err| match err {
|
||||
VarError::NotPresent => anyhow!("Environment variable {} must be set", environment_variable_name),
|
||||
VarError::NotUnicode(_) => anyhow!("Environment variable {} contains invalid characters", environment_variable_name)
|
||||
})?;
|
||||
|
||||
let log_file_pattern = parse_log_file_pattern_from_str(&log_file_pattern_str).with_context(|| format!("Could not parse pattern: {}", log_file_pattern_str))?;
|
||||
let log_files = log_file_pattern.search().with_context(|| format!("Could not search files: {}", log_file_pattern_str))?;
|
||||
|
||||
if log_files.is_empty() {
|
||||
bail!("No files match pattern: {}", log_file_pattern_str);
|
||||
}
|
||||
|
||||
for log_file in &log_files {
|
||||
println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label);
|
||||
}
|
||||
|
||||
Ok(log_files)
|
||||
}
|
||||
|
||||
pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: Metrics) -> Result<()> {
|
||||
let mut watcher = LogWatcherConfiguration::new();
|
||||
|
||||
for log_file in access_log_files.into_iter() {
|
||||
watcher.add_file(log_file, LogFileKind::Access);
|
||||
}
|
||||
|
||||
for log_file in error_log_files.into_iter() {
|
||||
watcher.add_file(log_file, LogFileKind::Error);
|
||||
}
|
||||
|
||||
watcher.start(&metrics).await
|
||||
}
|
71
src/main.rs
71
src/main.rs
@@ -1,65 +1,38 @@
|
||||
use std::env;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use tokio::signal;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::apache_metrics::ApacheMetrics;
|
||||
use crate::log_file_pattern::parse_log_file_pattern_from_env;
|
||||
use crate::log_watcher::read_logs_task;
|
||||
use crate::web_server::{create_web_server, run_web_server};
|
||||
use crate::metrics::Metrics;
|
||||
use crate::web::WebServer;
|
||||
|
||||
mod log_file_pattern;
|
||||
mod log_watcher;
|
||||
mod apache_metrics;
|
||||
mod web_server;
|
||||
mod logs;
|
||||
mod metrics;
|
||||
mod web;
|
||||
|
||||
const ACCESS_LOG_FILE_PATTERN: &str = "ACCESS_LOG_FILE_PATTERN";
|
||||
const ERROR_LOG_FILE_PATTERN: &str = "ERROR_LOG_FILE_PATTERN";
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1"));
|
||||
let bind_ip = IpAddr::from_str(&host).map_err(|_| anyhow!("Invalid HTTP host: {}", host))?;
|
||||
|
||||
println!("Initializing exporter...");
|
||||
|
||||
let log_file_pattern = match parse_log_file_pattern_from_env() {
|
||||
Ok(pattern) => pattern,
|
||||
Err(error) => {
|
||||
println!("Error: {}", error);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let access_log_files = logs::find_log_files(ACCESS_LOG_FILE_PATTERN, "access log").context("Could not find access log files")?;
|
||||
let error_log_files = logs::find_log_files(ERROR_LOG_FILE_PATTERN, "error log").context("Could not find error log files")?;
|
||||
|
||||
let log_files = match log_file_pattern.search() {
|
||||
Ok(files) => files,
|
||||
Err(error) => {
|
||||
println!("Error searching log files: {}", error);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let server = WebServer::try_bind(SocketAddr::new(bind_ip, 9240)).context("Could not configure web server")?;
|
||||
let (metrics_registry, metrics) = Metrics::new();
|
||||
|
||||
if log_files.is_empty() {
|
||||
println!("Found no matching log files.");
|
||||
return;
|
||||
}
|
||||
logs::start_log_watcher(access_log_files, error_log_files, metrics).await.context("Could not start watching logs")?;
|
||||
tokio::spawn(server.serve(Mutex::new(metrics_registry)));
|
||||
|
||||
for log_file in &log_files {
|
||||
println!("Found log file: {} (label \"{}\")", log_file.path.display(), log_file.label);
|
||||
}
|
||||
|
||||
let (metrics_registry, metrics) = ApacheMetrics::new();
|
||||
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
|
||||
|
||||
tokio::spawn(read_logs_task(log_files, metrics.clone(), shutdown_send.clone()));
|
||||
tokio::spawn(run_web_server(create_web_server(host.as_str(), 9240, Mutex::new(metrics_registry))));
|
||||
|
||||
drop(shutdown_send);
|
||||
|
||||
tokio::select! {
|
||||
_ = signal::ctrl_c() => {
|
||||
println!("Received CTRL-C, shutting down...")
|
||||
}
|
||||
|
||||
_ = shutdown_recv.recv() => {
|
||||
println!("Shutting down...");
|
||||
}
|
||||
}
|
||||
signal::ctrl_c().await.with_context(|| "Could not register CTRL-C handler")?;
|
||||
println!("Received CTRL-C, shutting down...");
|
||||
Ok(())
|
||||
}
|
||||
|
24
src/metrics.rs
Normal file
24
src/metrics.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use prometheus_client::metrics::counter::Counter;
|
||||
use prometheus_client::metrics::family::Family;
|
||||
use prometheus_client::registry::Registry;
|
||||
|
||||
type SingleLabel = [(&'static str, String); 1];
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Metrics {
|
||||
pub requests_total: Family<SingleLabel, Counter>,
|
||||
pub errors_total: Family<SingleLabel, Counter>
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
pub fn new() -> (Registry, Metrics) {
|
||||
let mut registry = <Registry>::default();
|
||||
|
||||
let metrics = Metrics::default();
|
||||
|
||||
registry.register("apache_requests", "Number of received requests", metrics.requests_total.clone());
|
||||
registry.register("apache_errors", "Number of logged errors", metrics.errors_total.clone());
|
||||
|
||||
(registry, metrics)
|
||||
}
|
||||
}
|
42
src/web/metrics_endpoint.rs
Normal file
42
src/web/metrics_endpoint.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use hyper::{Body, http, Response, StatusCode};
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use prometheus_client::encoding::text::encode;
|
||||
use prometheus_client::registry::Registry;
|
||||
|
||||
//noinspection SpellCheckingInspection
|
||||
const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text; version=1.0.0; charset=utf-8";
|
||||
|
||||
pub async fn handle(metrics_registry: Arc<Mutex<Registry>>) -> http::Result<Response<Body>> {
|
||||
match try_encode(metrics_registry) {
|
||||
MetricsEncodeResult::Ok(buf) => {
|
||||
Response::builder().status(StatusCode::OK).header(CONTENT_TYPE, METRICS_CONTENT_TYPE).body(Body::from(buf))
|
||||
}
|
||||
MetricsEncodeResult::FailedAcquiringRegistryLock => {
|
||||
println!("[WebServer] Failed acquiring lock on registry.");
|
||||
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
|
||||
}
|
||||
MetricsEncodeResult::FailedEncodingMetrics(e) => {
|
||||
println!("[WebServer] Error encoding metrics: {}", e);
|
||||
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum MetricsEncodeResult {
|
||||
Ok(String),
|
||||
FailedAcquiringRegistryLock,
|
||||
FailedEncodingMetrics(fmt::Error),
|
||||
}
|
||||
|
||||
fn try_encode(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult {
|
||||
let mut buf = String::new();
|
||||
|
||||
return if let Ok(metrics_registry) = metrics_registry.lock() {
|
||||
encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf))
|
||||
} else {
|
||||
MetricsEncodeResult::FailedAcquiringRegistryLock
|
||||
};
|
||||
}
|
57
src/web/mod.rs
Normal file
57
src/web/mod.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use hyper::{Body, Error, Method, Request, Response, Server, StatusCode};
|
||||
use hyper::http::Result;
|
||||
use hyper::server::Builder;
|
||||
use hyper::server::conn::AddrIncoming;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use prometheus_client::registry::Registry;
|
||||
|
||||
mod metrics_endpoint;
|
||||
|
||||
const MAX_BUFFER_SIZE: usize = 1024 * 32;
|
||||
|
||||
pub struct WebServer {
|
||||
builder: Builder<AddrIncoming>,
|
||||
}
|
||||
|
||||
impl WebServer {
|
||||
//noinspection HttpUrlsUsage
|
||||
pub fn try_bind(addr: SocketAddr) -> anyhow::Result<WebServer> {
|
||||
println!("[WebServer] Starting web server on {0} with metrics endpoint: http://{0}/metrics", addr);
|
||||
|
||||
let builder = Server::try_bind(&addr).with_context(|| format!("Could not bind to {}", addr))?;
|
||||
let builder = builder.tcp_keepalive(Some(Duration::from_secs(60)));
|
||||
let builder = builder.http1_only(true);
|
||||
let builder = builder.http1_keepalive(true);
|
||||
let builder = builder.http1_max_buf_size(MAX_BUFFER_SIZE);
|
||||
let builder = builder.http1_header_read_timeout(Duration::from_secs(10));
|
||||
|
||||
Ok(WebServer { builder })
|
||||
}
|
||||
|
||||
pub async fn serve(self, metrics_registry: Mutex<Registry>) {
|
||||
let metrics_registry = Arc::new(metrics_registry);
|
||||
let service = make_service_fn(move |_| {
|
||||
let metrics_registry = Arc::clone(&metrics_registry);
|
||||
async move {
|
||||
Ok::<_, Error>(service_fn(move |req| handle_request(req, Arc::clone(&metrics_registry))))
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = self.builder.serve(service).await {
|
||||
println!("[WebServer] Error starting web server: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_request(req: Request<Body>, metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
|
||||
if req.method() == Method::GET && req.uri().path() == "/metrics" {
|
||||
metrics_endpoint::handle(Arc::clone(&metrics_registry)).await
|
||||
} else {
|
||||
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty())
|
||||
}
|
||||
}
|
@@ -1,54 +0,0 @@
|
||||
use std::str;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_web::{App, HttpResponse, HttpServer, Result, web};
|
||||
use actix_web::dev::Server;
|
||||
use prometheus_client::encoding::text::encode;
|
||||
use prometheus_client::registry::Registry;
|
||||
|
||||
//noinspection HttpUrlsUsage
|
||||
pub fn create_web_server(host: &str, port: u16, metrics_registry: Mutex<Registry>) -> Server {
|
||||
let metrics_registry = web::Data::new(metrics_registry);
|
||||
|
||||
let server = HttpServer::new(move || {
|
||||
App::new()
|
||||
.app_data(metrics_registry.clone())
|
||||
.service(web::resource("/metrics").route(web::get().to(metrics_handler)))
|
||||
});
|
||||
|
||||
let server = server.keep_alive(Duration::from_secs(60));
|
||||
let server = server.shutdown_timeout(0);
|
||||
let server = server.disable_signals();
|
||||
let server = server.workers(1);
|
||||
let server = server.bind((host, port));
|
||||
|
||||
println!("[WebServer] Starting web server on http://{}:{}", host, port);
|
||||
return server.unwrap().run();
|
||||
}
|
||||
|
||||
pub async fn run_web_server(server: Server) {
|
||||
if let Err(e) = server.await {
|
||||
println!("[WebServer] Error running web server: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
async fn metrics_handler(metrics_registry: web::Data<Mutex<Registry>>) -> Result<HttpResponse> {
|
||||
let mut buf = Vec::new();
|
||||
|
||||
{
|
||||
if let Ok(metrics_registry) = metrics_registry.lock() {
|
||||
encode(&mut buf, &metrics_registry)?;
|
||||
} else {
|
||||
println!("[WebServer] Failed acquiring lock on registry.");
|
||||
return Ok(HttpResponse::InternalServerError().body(""));
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(buf) = String::from_utf8(buf) {
|
||||
Ok(HttpResponse::Ok().content_type("application/openmetrics-text; version=1.0.0; charset=utf-8").body(buf))
|
||||
} else {
|
||||
println!("[WebServer] Failed converting buffer to UTF-8.");
|
||||
Ok(HttpResponse::InternalServerError().body(""))
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user