1
0
mirror of https://github.com/chylex/Apache-Prometheus-Exporter.git synced 2025-09-15 17:32:12 +02:00

Compare commits

...

8 Commits

15 changed files with 617 additions and 1200 deletions

997
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "apache_prometheus_exporter"
version = "0.1.0"
version = "1.0.0"
edition = "2021"
[[bin]]
@@ -13,8 +13,9 @@ lto = true
codegen-units = 1
[dependencies]
actix-web = "4.4.0"
linemux = "0.3.0"
anyhow = "1.0.75"
hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] }
notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
path-slash = "0.2.1"
prometheus-client = "0.21.2"
tokio = { version = "1.32.0", features = ["rt", "macros", "signal"] }
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "rt", "signal"] }

View File

@@ -82,17 +82,21 @@ The wildcard must not include any prefix or suffix, so `/*/` is accepted, but `/
#### Notes
> At least one access log file and one error log file must be found when the exporter starts, otherwise the exporter immediately exits with an error.
> If a log file is deleted, the exporter will automatically resume watching it if it is re-created later. If you want the exporter to forget about deleted log files, restart the exporter.
> The exporter only searches for files when it starts. If you need the exporter to watch a new file or forget a deleted file, you must restart it.
## 4. Launch the Exporter
Start the exporter. The standard output will show which log files have been found, the web server host, and the metrics endpoint URL.
Press `Ctrl-C` to stop the exporter.
If no errors are shown, the exporter will begin reading the found log files from the end, and printing each line to the standard output. When a log file is rotated, the exporter will begin reading it from the beginning.
**Important:** Due to library bugs, the exporter will currently not watch rotated log files. If you want to use this project right now, you will need to add the `-c` flag to `rotatelogs`, and restart the exporter after every rotation.
Press `Ctrl-C` to stop the exporter. Signals other than `SIGINT` are ignored.
#### Notes
> The exporter is designed to work and tested with the `rotatelogs` tool in a Linux container. Any other tools or operating systems are unsupported.
> If an error occurs while reading a file or re-opening a rotated file, the exporter will stop watching it and print the error to standard output.
## 5. Collect Prometheus Metrics

View File

@@ -46,6 +46,7 @@ services:
HTTP_HOST: "0.0.0.0"
ACCESS_LOG_FILE_PATTERN: "/logs/*.access.log"
ERROR_LOG_FILE_PATTERN: "/logs/*.error.log"
stop_signal: SIGINT
restart: "always"
volumes:

View File

@@ -1,112 +0,0 @@
use std::collections::HashMap;
use std::io;
use std::io::{Error, ErrorKind};
use std::path::PathBuf;
use linemux::{Line, MuxedLines};
use tokio::sync::mpsc::UnboundedSender;
use crate::ApacheMetrics;
use crate::log_file_pattern::LogFilePath;
#[derive(Copy, Clone, PartialEq)]
enum LogFileKind {
Access,
Error,
}
struct LogFileInfo<'a> {
pub kind: LogFileKind,
pub label: &'a String,
}
impl<'a> LogFileInfo<'a> {
fn get_label_set(&self) -> [(&'static str, String); 1] {
[("file", self.label.clone())]
}
}
pub async fn watch_logs_task(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics, shutdown_send: UnboundedSender<()>) {
if let Err(error) = watch_logs(access_log_files, error_log_files, metrics).await {
println!("[LogWatcher] Error reading logs: {}", error);
shutdown_send.send(()).unwrap();
}
}
struct LogWatcher<'a> {
reader: MuxedLines,
files: HashMap<PathBuf, LogFileInfo<'a>>,
}
impl<'a> LogWatcher<'a> {
fn new() -> io::Result<LogWatcher<'a>> {
Ok(LogWatcher {
reader: MuxedLines::new()?,
files: HashMap::new(),
})
}
fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
return self.files.values().filter(|info| info.kind == kind).count();
}
async fn add_file(&mut self, log_file: &'a LogFilePath, kind: LogFileKind) -> io::Result<()> {
let lookup_key = self.reader.add_file(&log_file.path).await?;
self.files.insert(lookup_key, LogFileInfo { kind, label: &log_file.label });
Ok(())
}
async fn start_watching(&mut self, metrics: &ApacheMetrics) -> io::Result<()> {
if self.files.is_empty() {
println!("[LogWatcher] No log files provided.");
return Err(Error::from(ErrorKind::Unsupported));
}
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
for metadata in self.files.values() {
let label_set = metadata.get_label_set();
let _ = metrics.requests_total.get_or_create(&label_set);
let _ = metrics.errors_total.get_or_create(&label_set);
}
loop {
if let Some(event) = self.reader.next_line().await? {
self.handle_line(event, metrics);
}
}
}
fn handle_line(&mut self, event: Line, metrics: &ApacheMetrics) {
match self.files.get(event.source()) {
Some(metadata) => {
let label = metadata.label;
let (kind, family) = match metadata.kind {
LogFileKind::Access => ("access log", &metrics.requests_total),
LogFileKind::Error => ("error log", &metrics.errors_total),
};
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, label, event.line());
family.get_or_create(&metadata.get_label_set()).inc();
}
None => {
println!("[LogWatcher] Received line from unknown file: {}", event.source().display());
}
}
}
}
async fn watch_logs(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: ApacheMetrics) -> io::Result<()> {
let mut watcher = LogWatcher::new()?;
for log_file in &access_log_files {
watcher.add_file(log_file, LogFileKind::Access).await?;
}
for log_file in &error_log_files {
watcher.add_file(log_file, LogFileKind::Error).await?;
}
watcher.start_watching(&metrics).await?;
Ok(())
}

View File

@@ -0,0 +1,62 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use notify::{ErrorKind, Event, recommended_watcher, RecommendedWatcher, RecursiveMode, Result, Watcher};
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
pub struct FsWatcher {
watcher: Mutex<RecommendedWatcher>,
}
impl FsWatcher {
pub fn new(callbacks: FsEventCallbacks) -> Result<Self> {
let watcher = recommended_watcher(move |event| callbacks.handle_event(event))?;
let watcher = Mutex::new(watcher);
Ok(Self { watcher })
}
pub async fn watch(&self, path: &Path) -> Result<()> {
let mut watcher = self.watcher.lock().await;
if let Err(e) = watcher.unwatch(path) {
if !matches!(e.kind, ErrorKind::WatchNotFound) {
return Err(e);
}
}
watcher.watch(path, RecursiveMode::NonRecursive)
}
}
pub struct FsEventCallbacks {
senders: HashMap<PathBuf, Sender<Event>>,
}
impl FsEventCallbacks {
pub fn new() -> Self {
Self { senders: HashMap::new() }
}
pub fn register(&mut self, path: &Path, sender: Sender<Event>) {
self.senders.insert(path.to_path_buf(), sender);
}
fn handle_event(&self, event: Result<Event>) {
match event {
Ok(event) => {
for path in &event.paths {
if let Some(sender) = self.senders.get(path) {
if let Err(e) = sender.try_send(event.clone()) {
println!("[FsWatcher] Error sending filesystem event for path \"{}\": {}", path.to_string_lossy(), e);
}
}
}
}
Err(e) => {
println!("[FsWatcher] Error receiving filesystem event: {}", e);
}
}
}
}

View File

@@ -1,9 +1,9 @@
use std::{env, io};
use std::env::VarError;
use std::fs::DirEntry;
use std::io;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Result};
use path_slash::PathExt;
/// Reads and parses an environment variable that determines the path and file name pattern of log files.
@@ -13,34 +13,22 @@ use path_slash::PathExt;
/// 1. A simple path to a file.
/// 2. A path with a wildcard anywhere in the file name.
/// 3. A path with a standalone wildcard component (i.e. no prefix or suffix in the folder name).
pub fn parse_log_file_pattern_from_env(variable_name: &str) -> Result<LogFilePattern, String> {
match env::var(variable_name) {
Ok(str) => {
let pattern_str = Path::new(&str).to_slash().ok_or(format!("Environment variable {} contains an invalid path.", variable_name))?;
parse_log_file_pattern_from_str(&pattern_str)
}
Err(err) => match err {
VarError::NotPresent => Err(format!("Environment variable {} must be set.", variable_name)),
VarError::NotUnicode(_) => Err(format!("Environment variable {} contains invalid characters.", variable_name))
}
}
}
fn parse_log_file_pattern_from_str(pattern_str: &str) -> Result<LogFilePattern, String> {
if pattern_str.trim().is_empty() {
return Err(String::from("Path is empty."));
pub fn parse_log_file_pattern_from_str(pattern: &str) -> Result<LogFilePattern> {
let pattern = Path::new(pattern).to_slash().ok_or_else(|| anyhow!("Path is invalid"))?;
if pattern.trim().is_empty() {
bail!("Path is empty");
}
if let Some((left, right)) = pattern_str.split_once('*') {
if let Some((left, right)) = pattern.split_once('*') {
parse_log_file_pattern_split_on_wildcard(left, right)
} else {
Ok(LogFilePattern::WithoutWildcard(pattern_str.to_string()))
Ok(LogFilePattern::WithoutWildcard(pattern.to_string()))
}
}
fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<LogFilePattern, String> {
fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<LogFilePattern> {
if left.contains('*') || right.contains('*') {
return Err(String::from("Path has too many wildcards."));
bail!("Path has too many wildcards");
}
if left.ends_with('/') && right.starts_with('/') {
@@ -51,7 +39,7 @@ fn parse_log_file_pattern_split_on_wildcard(left: &str, right: &str) -> Result<L
}
if right.contains('/') {
return Err(String::from("Path has a folder wildcard with a prefix or suffix."));
bail!("Path has a folder wildcard with a prefix or suffix");
}
if let Some((folder_path, file_name_prefix)) = left.rsplit_once('/') {
@@ -122,10 +110,7 @@ impl LogFilePattern {
}
fn search_without_wildcard(path_str: &String) -> Result<Vec<LogFilePath>, io::Error> {
let path = Path::new(path_str);
let is_valid = path.is_file() || matches!(path.parent(), Some(parent) if parent.is_dir());
if is_valid {
if Path::new(path_str).is_file() {
Ok(vec![LogFilePath::with_empty_label(path_str)])
} else {
Err(io::Error::from(ErrorKind::NotFound))
@@ -178,27 +163,27 @@ impl LogFilePath {
#[cfg(test)]
mod tests {
use crate::log_file_pattern::{LogFilePattern, parse_log_file_pattern_from_str};
use super::{LogFilePattern, parse_log_file_pattern_from_str};
#[test]
fn empty_path() {
assert!(matches!(parse_log_file_pattern_from_str(""), Err(err) if err == "Path is empty."));
assert!(matches!(parse_log_file_pattern_from_str(" "), Err(err) if err == "Path is empty."));
assert!(matches!(parse_log_file_pattern_from_str(""), Err(err) if err.to_string() == "Path is empty"));
assert!(matches!(parse_log_file_pattern_from_str(" "), Err(err) if err.to_string() == "Path is empty"));
}
#[test]
fn too_many_wildcards() {
assert!(matches!(parse_log_file_pattern_from_str("/path/*/to/files/*.log"), Err(err) if err == "Path has too many wildcards."));
assert!(matches!(parse_log_file_pattern_from_str("/path/*/to/files/*.log"), Err(err) if err.to_string() == "Path has too many wildcards"));
}
#[test]
fn folder_wildcard_with_prefix_not_supported() {
assert!(matches!(parse_log_file_pattern_from_str("/path/*abc/to/files/access.log"), Err(err) if err == "Path has a folder wildcard with a prefix or suffix."));
assert!(matches!(parse_log_file_pattern_from_str("/path/*abc/to/files/access.log"), Err(err) if err.to_string() == "Path has a folder wildcard with a prefix or suffix"));
}
#[test]
fn folder_wildcard_with_suffix_not_supported() {
assert!(matches!(parse_log_file_pattern_from_str("/path/abc*/to/files/access.log"), Err(err) if err == "Path has a folder wildcard with a prefix or suffix."));
assert!(matches!(parse_log_file_pattern_from_str("/path/abc*/to/files/access.log"), Err(err) if err.to_string() == "Path has a folder wildcard with a prefix or suffix"));
}
#[test]

View File

@@ -0,0 +1,256 @@
use std::cmp::max;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{anyhow, bail, Context, Result};
use notify::{Event, EventKind};
use notify::event::{CreateKind, ModifyKind};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader, Lines};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use crate::logs::filesystem_watcher::{FsEventCallbacks, FsWatcher};
use crate::logs::log_file_pattern::LogFilePath;
use crate::metrics::Metrics;
#[derive(Copy, Clone, PartialEq)]
pub enum LogFileKind {
Access,
Error,
}
struct LogFileMetadata {
pub kind: LogFileKind,
pub label: String,
}
impl LogFileMetadata {
fn get_label_set(&self) -> [(&'static str, String); 1] {
[("file", self.label.clone())]
}
}
pub struct LogWatcherConfiguration {
files: Vec<(PathBuf, LogFileMetadata)>,
}
impl LogWatcherConfiguration {
pub fn new() -> LogWatcherConfiguration {
LogWatcherConfiguration { files: Vec::new() }
}
fn count_files_of_kind(&self, kind: LogFileKind) -> usize {
return self.files.iter().filter(|(_, metadata)| metadata.kind == kind).count();
}
pub fn add_file(&mut self, log_file: LogFilePath, kind: LogFileKind) {
let path = log_file.path;
let label = log_file.label;
let metadata = LogFileMetadata { kind, label };
self.files.push((path, metadata));
}
pub async fn start(self, metrics: &Metrics) -> Result<()> {
if self.files.is_empty() {
bail!("No log files provided");
}
println!("[LogWatcher] Watching {} access log file(s) and {} error log file(s).", self.count_files_of_kind(LogFileKind::Access), self.count_files_of_kind(LogFileKind::Error));
struct PreparedFile {
path: PathBuf,
metadata: LogFileMetadata,
fs_event_receiver: Receiver<Event>,
}
let mut prepared_files = Vec::new();
let mut fs_callbacks = FsEventCallbacks::new();
for (path, metadata) in self.files {
let (fs_event_sender, fs_event_receiver) = mpsc::channel(20);
fs_callbacks.register(&path, fs_event_sender);
prepared_files.push(PreparedFile { path, metadata, fs_event_receiver });
}
let fs_watcher = FsWatcher::new(fs_callbacks).context("Could not create filesystem watcher")?;
for file in &prepared_files {
let file_path = &file.path;
if !file_path.is_absolute() {
bail!("Path is not absolute: {}", file_path.to_string_lossy());
}
let parent_path = file_path.parent().ok_or_else(|| anyhow!("Path has no parent: {}", file_path.to_string_lossy()))?;
fs_watcher.watch(parent_path).await.with_context(|| format!("Could not create filesystem watcher for directory: {}", parent_path.to_string_lossy()))?;
}
let fs_watcher = Arc::new(fs_watcher);
for file in prepared_files {
let label_set = file.metadata.get_label_set();
let _ = metrics.requests_total.get_or_create(&label_set);
let _ = metrics.errors_total.get_or_create(&label_set);
let log_watcher = LogWatcher::create(file.path.clone(), file.metadata, metrics.clone(), Arc::clone(&fs_watcher), file.fs_event_receiver);
let log_watcher = log_watcher.await.with_context(|| format!("Could not watch log file: {}", file.path.to_string_lossy()))?;
tokio::spawn(log_watcher.watch());
}
Ok(())
}
}
struct LogWatcher {
state: LogWatchingState,
processor: LogLineProcessor,
fs_event_receiver: Receiver<Event>,
}
impl LogWatcher {
async fn create(path: PathBuf, metadata: LogFileMetadata, metrics: Metrics, fs_watcher: Arc<FsWatcher>, fs_event_receiver: Receiver<Event>) -> Result<Self> {
let state = LogWatchingState::initialize(path.clone(), fs_watcher).await?;
let processor = LogLineProcessor { path, metadata, metrics };
Ok(LogWatcher { state, processor, fs_event_receiver })
}
async fn watch(mut self) {
while let Ok(Some(_)) = self.state.lines.next_line().await {
// Skip lines that already existed.
}
let path = &self.processor.path;
'read_loop:
loop {
if !self.processor.process_lines(&mut self.state.lines).await {
break 'read_loop;
}
'event_loop:
loop {
let mut next_event = CoalescedFsEvent::None;
match self.fs_event_receiver.recv().await {
None => break 'read_loop,
Some(event) => {
next_event = next_event.merge(event);
while let Ok(event) = self.fs_event_receiver.try_recv() {
next_event = next_event.merge(event);
}
}
}
match next_event {
CoalescedFsEvent::None => continue 'event_loop,
CoalescedFsEvent::NewData => continue 'read_loop,
CoalescedFsEvent::NewFile => {
println!("[LogWatcher] File recreated: {}", path.to_string_lossy());
if !self.processor.process_lines(&mut self.state.lines).await {
break 'read_loop;
}
self.state = match self.state.reinitialize().await {
Ok(state) => state,
Err(e) => {
println!("Could not re-watch log file \"{}\": {}", path.to_string_lossy(), e);
break 'read_loop;
}
};
while let Ok(Some(_)) = self.state.lines.next_line().await {
// There are occasional spurious file creation events, so reading
// from the beginning would read lines that were already counted.
}
continue 'read_loop;
}
}
}
}
println!("[LogWatcher] Stopping log watcher for: {}", path.to_string_lossy());
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
enum CoalescedFsEvent {
None = 0,
NewData = 1,
NewFile = 2,
}
impl CoalescedFsEvent {
fn merge(self, event: Event) -> CoalescedFsEvent {
match event.kind {
EventKind::Modify(ModifyKind::Data(_)) => {
max(self, CoalescedFsEvent::NewData)
}
EventKind::Create(CreateKind::File) => {
max(self, CoalescedFsEvent::NewFile)
}
_ => self
}
}
}
struct LogWatchingState {
path: PathBuf,
lines: Lines<BufReader<File>>,
fs_watcher: Arc<FsWatcher>,
}
impl LogWatchingState {
const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4;
async fn initialize(path: PathBuf, fs_watcher: Arc<FsWatcher>) -> Result<LogWatchingState> {
fs_watcher.watch(&path).await.context("Could not create filesystem watcher")?;
let file = File::open(&path).await.context("Could not open file")?;
let lines = BufReader::with_capacity(Self::DEFAULT_BUFFER_CAPACITY, file).lines();
Ok(LogWatchingState { path, lines, fs_watcher })
}
async fn reinitialize(self) -> Result<LogWatchingState> {
LogWatchingState::initialize(self.path, self.fs_watcher).await
}
}
struct LogLineProcessor {
path: PathBuf,
metadata: LogFileMetadata,
metrics: Metrics,
}
impl LogLineProcessor {
async fn process_lines(&self, reader: &mut Lines<BufReader<File>>) -> bool {
loop {
match reader.next_line().await {
Ok(maybe_line) => match maybe_line {
Some(line) => self.handle_line(line),
None => return true,
},
Err(e) => {
println!("[LogWatcher] Error reading from file \"{}\": {}", self.path.to_string_lossy(), e);
return false;
}
}
}
}
fn handle_line(&self, line: String) {
let (kind, family) = match self.metadata.kind {
LogFileKind::Access => ("access log", &self.metrics.requests_total),
LogFileKind::Error => ("error log", &self.metrics.errors_total),
};
println!("[LogWatcher] Received {} line from \"{}\": {}", kind, self.metadata.label, line);
family.get_or_create(&self.metadata.get_label_set()).inc();
}
}

48
src/logs/mod.rs Normal file
View File

@@ -0,0 +1,48 @@
use std::env;
use std::env::VarError;
use anyhow::{anyhow, bail, Context, Result};
use log_file_watcher::{LogFileKind, LogWatcherConfiguration};
use crate::logs::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_str};
use crate::metrics::Metrics;
mod access_log_parser;
mod filesystem_watcher;
mod log_file_pattern;
mod log_file_watcher;
pub fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Result<Vec<LogFilePath>> {
let log_file_pattern_str = env::var(environment_variable_name).map_err(|err| match err {
VarError::NotPresent => anyhow!("Environment variable {} must be set", environment_variable_name),
VarError::NotUnicode(_) => anyhow!("Environment variable {} contains invalid characters", environment_variable_name)
})?;
let log_file_pattern = parse_log_file_pattern_from_str(&log_file_pattern_str).with_context(|| format!("Could not parse pattern: {}", log_file_pattern_str))?;
let log_files = log_file_pattern.search().with_context(|| format!("Could not search files: {}", log_file_pattern_str))?;
if log_files.is_empty() {
bail!("No files match pattern: {}", log_file_pattern_str);
}
for log_file in &log_files {
println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label);
}
Ok(log_files)
}
pub async fn start_log_watcher(access_log_files: Vec<LogFilePath>, error_log_files: Vec<LogFilePath>, metrics: Metrics) -> Result<()> {
let mut watcher = LogWatcherConfiguration::new();
for log_file in access_log_files.into_iter() {
watcher.add_file(log_file, LogFileKind::Access);
}
for log_file in error_log_files.into_iter() {
watcher.add_file(log_file, LogFileKind::Error);
}
watcher.start(&metrics).await
}

View File

@@ -1,86 +1,38 @@
use std::env;
use std::process::ExitCode;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::sync::Mutex;
use anyhow::{anyhow, Context};
use tokio::signal;
use tokio::sync::mpsc;
use crate::apache_metrics::ApacheMetrics;
use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env};
use crate::log_watcher::watch_logs_task;
use crate::web_server::{create_web_server, run_web_server};
use crate::metrics::Metrics;
use crate::web::WebServer;
mod apache_metrics;
mod log_file_pattern;
mod log_parser;
mod log_watcher;
mod web_server;
mod logs;
mod metrics;
mod web;
const ACCESS_LOG_FILE_PATTERN: &str = "ACCESS_LOG_FILE_PATTERN";
const ERROR_LOG_FILE_PATTERN: &str = "ERROR_LOG_FILE_PATTERN";
fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec<LogFilePath>> {
let log_file_pattern = match parse_log_file_pattern_from_env(environment_variable_name) {
Ok(pattern) => pattern,
Err(error) => {
println!("Error: {}", error);
return None;
}
};
let log_files = match log_file_pattern.search() {
Ok(files) => files,
Err(error) => {
println!("Error searching {} files: {}", log_kind, error);
return None;
}
};
if log_files.is_empty() {
println!("Found no matching {} files.", log_kind);
return None;
}
for log_file in &log_files {
println!("Found {} file: {} (label \"{}\")", log_kind, log_file.path.display(), log_file.label);
}
Some(log_files)
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> ExitCode {
async fn main() -> anyhow::Result<()> {
let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1"));
let bind_ip = IpAddr::from_str(&host).map_err(|_| anyhow!("Invalid HTTP host: {}", host))?;
println!("Initializing exporter...");
let access_log_files = match find_log_files(ACCESS_LOG_FILE_PATTERN, "access log") {
Some(files) => files,
None => return ExitCode::FAILURE,
};
let access_log_files = logs::find_log_files(ACCESS_LOG_FILE_PATTERN, "access log").context("Could not find access log files")?;
let error_log_files = logs::find_log_files(ERROR_LOG_FILE_PATTERN, "error log").context("Could not find error log files")?;
let error_log_files = match find_log_files(ERROR_LOG_FILE_PATTERN, "error log") {
Some(files) => files,
None => return ExitCode::FAILURE,
};
let server = WebServer::try_bind(SocketAddr::new(bind_ip, 9240)).context("Could not configure web server")?;
let (metrics_registry, metrics) = Metrics::new();
let (metrics_registry, metrics) = ApacheMetrics::new();
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
logs::start_log_watcher(access_log_files, error_log_files, metrics).await.context("Could not start watching logs")?;
tokio::spawn(server.serve(Mutex::new(metrics_registry)));
tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone()));
tokio::spawn(run_web_server(create_web_server(host.as_str(), 9240, Mutex::new(metrics_registry))));
drop(shutdown_send);
tokio::select! {
_ = signal::ctrl_c() => {
println!("Received CTRL-C, shutting down...")
}
_ = shutdown_recv.recv() => {
println!("Shutting down...");
}
}
ExitCode::SUCCESS
signal::ctrl_c().await.with_context(|| "Could not register CTRL-C handler")?;
println!("Received CTRL-C, shutting down...");
Ok(())
}

View File

@@ -4,20 +4,17 @@ use prometheus_client::registry::Registry;
type SingleLabel = [(&'static str, String); 1];
#[derive(Clone)]
pub struct ApacheMetrics {
#[derive(Clone, Default)]
pub struct Metrics {
pub requests_total: Family<SingleLabel, Counter>,
pub errors_total: Family<SingleLabel, Counter>
}
impl ApacheMetrics {
pub fn new() -> (Registry, ApacheMetrics) {
impl Metrics {
pub fn new() -> (Registry, Metrics) {
let mut registry = <Registry>::default();
let metrics = ApacheMetrics {
requests_total: Family::<SingleLabel, Counter>::default(),
errors_total: Family::<SingleLabel, Counter>::default()
};
let metrics = Metrics::default();
registry.register("apache_requests", "Number of received requests", metrics.requests_total.clone());
registry.register("apache_errors", "Number of logged errors", metrics.errors_total.clone());

View File

@@ -0,0 +1,42 @@
use std::fmt;
use std::sync::{Arc, Mutex};
use hyper::{Body, http, Response, StatusCode};
use hyper::header::CONTENT_TYPE;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
//noinspection SpellCheckingInspection
const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text; version=1.0.0; charset=utf-8";
pub async fn handle(metrics_registry: Arc<Mutex<Registry>>) -> http::Result<Response<Body>> {
match try_encode(metrics_registry) {
MetricsEncodeResult::Ok(buf) => {
Response::builder().status(StatusCode::OK).header(CONTENT_TYPE, METRICS_CONTENT_TYPE).body(Body::from(buf))
}
MetricsEncodeResult::FailedAcquiringRegistryLock => {
println!("[WebServer] Failed acquiring lock on registry.");
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
}
MetricsEncodeResult::FailedEncodingMetrics(e) => {
println!("[WebServer] Error encoding metrics: {}", e);
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())
}
}
}
enum MetricsEncodeResult {
Ok(String),
FailedAcquiringRegistryLock,
FailedEncodingMetrics(fmt::Error),
}
fn try_encode(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult {
let mut buf = String::new();
return if let Ok(metrics_registry) = metrics_registry.lock() {
encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf))
} else {
MetricsEncodeResult::FailedAcquiringRegistryLock
};
}

57
src/web/mod.rs Normal file
View File

@@ -0,0 +1,57 @@
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::Context;
use hyper::{Body, Error, Method, Request, Response, Server, StatusCode};
use hyper::http::Result;
use hyper::server::Builder;
use hyper::server::conn::AddrIncoming;
use hyper::service::{make_service_fn, service_fn};
use prometheus_client::registry::Registry;
mod metrics_endpoint;
const MAX_BUFFER_SIZE: usize = 1024 * 32;
pub struct WebServer {
builder: Builder<AddrIncoming>,
}
impl WebServer {
//noinspection HttpUrlsUsage
pub fn try_bind(addr: SocketAddr) -> anyhow::Result<WebServer> {
println!("[WebServer] Starting web server on {0} with metrics endpoint: http://{0}/metrics", addr);
let builder = Server::try_bind(&addr).with_context(|| format!("Could not bind to {}", addr))?;
let builder = builder.tcp_keepalive(Some(Duration::from_secs(60)));
let builder = builder.http1_only(true);
let builder = builder.http1_keepalive(true);
let builder = builder.http1_max_buf_size(MAX_BUFFER_SIZE);
let builder = builder.http1_header_read_timeout(Duration::from_secs(10));
Ok(WebServer { builder })
}
pub async fn serve(self, metrics_registry: Mutex<Registry>) {
let metrics_registry = Arc::new(metrics_registry);
let service = make_service_fn(move |_| {
let metrics_registry = Arc::clone(&metrics_registry);
async move {
Ok::<_, Error>(service_fn(move |req| handle_request(req, Arc::clone(&metrics_registry))))
}
});
if let Err(e) = self.builder.serve(service).await {
println!("[WebServer] Error starting web server: {}", e);
}
}
}
async fn handle_request(req: Request<Body>, metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> {
if req.method() == Method::GET && req.uri().path() == "/metrics" {
metrics_endpoint::handle(Arc::clone(&metrics_registry)).await
} else {
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty())
}
}

View File

@@ -1,69 +0,0 @@
use std::{fmt, str};
use std::sync::Mutex;
use std::time::Duration;
use actix_web::{App, HttpResponse, HttpServer, Result, web};
use actix_web::dev::Server;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
//noinspection HttpUrlsUsage
pub fn create_web_server(host: &str, port: u16, metrics_registry: Mutex<Registry>) -> Server {
let metrics_registry = web::Data::new(metrics_registry);
let server = HttpServer::new(move || {
App::new()
.app_data(metrics_registry.clone())
.service(web::resource("/metrics").route(web::get().to(metrics_handler)))
});
let server = server.keep_alive(Duration::from_secs(60));
let server = server.shutdown_timeout(0);
let server = server.disable_signals();
let server = server.workers(1);
let server = server.bind((host, port));
println!("[WebServer] Starting web server on {0}:{1} with metrics endpoint: http://{0}:{1}/metrics", host, port);
server.unwrap().run()
}
pub async fn run_web_server(server: Server) {
if let Err(e) = server.await {
println!("[WebServer] Error running web server: {}", e);
}
}
//noinspection SpellCheckingInspection
async fn metrics_handler(metrics_registry: web::Data<Mutex<Registry>>) -> Result<HttpResponse> {
let response = match encode_metrics(metrics_registry) {
MetricsEncodeResult::Ok(buf) => {
HttpResponse::Ok().content_type("application/openmetrics-text; version=1.0.0; charset=utf-8").body(buf)
}
MetricsEncodeResult::FailedAcquiringRegistryLock => {
println!("[WebServer] Failed acquiring lock on registry.");
HttpResponse::InternalServerError().body("")
}
MetricsEncodeResult::FailedEncodingMetrics(e) => {
println!("[WebServer] Error encoding metrics: {}", e);
HttpResponse::InternalServerError().body("")
}
};
Ok(response)
}
enum MetricsEncodeResult {
Ok(String),
FailedAcquiringRegistryLock,
FailedEncodingMetrics(fmt::Error),
}
fn encode_metrics(metrics_registry: web::Data<Mutex<Registry>>) -> MetricsEncodeResult {
let mut buf = String::new();
return if let Ok(metrics_registry) = metrics_registry.lock() {
encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf))
} else {
MetricsEncodeResult::FailedAcquiringRegistryLock
}
}