1
0
mirror of https://github.com/chylex/Apache-Prometheus-Exporter.git synced 2024-11-26 01:42:52 +01:00

Compare commits

..

No commits in common. "7def8921b05b8be1c9c874dd54f1e591a15d9222" and "173e4249a64dcc62300ba4067b733e24b417e79a" have entirely different histories.

5 changed files with 821 additions and 145 deletions

851
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,7 @@ lto = true
codegen-units = 1 codegen-units = 1
[dependencies] [dependencies]
hyper = { version = "0.14.27", default-features = false, features = ["http1", "server", "runtime"] } actix-web = "4.4.0"
linemux = "0.3.0" linemux = "0.3.0"
path-slash = "0.2.1" path-slash = "0.2.1"
prometheus-client = "0.21.2" prometheus-client = "0.21.2"

View File

@ -46,7 +46,6 @@ services:
HTTP_HOST: "0.0.0.0" HTTP_HOST: "0.0.0.0"
ACCESS_LOG_FILE_PATTERN: "/logs/*.access.log" ACCESS_LOG_FILE_PATTERN: "/logs/*.access.log"
ERROR_LOG_FILE_PATTERN: "/logs/*.error.log" ERROR_LOG_FILE_PATTERN: "/logs/*.error.log"
stop_signal: SIGINT
restart: "always" restart: "always"
volumes: volumes:

View File

@ -1,7 +1,5 @@
use std::env; use std::env;
use std::net::{IpAddr, SocketAddr};
use std::process::ExitCode; use std::process::ExitCode;
use std::str::FromStr;
use std::sync::Mutex; use std::sync::Mutex;
use tokio::signal; use tokio::signal;
@ -10,7 +8,7 @@ use tokio::sync::mpsc;
use crate::apache_metrics::ApacheMetrics; use crate::apache_metrics::ApacheMetrics;
use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env}; use crate::log_file_pattern::{LogFilePath, parse_log_file_pattern_from_env};
use crate::log_watcher::watch_logs_task; use crate::log_watcher::watch_logs_task;
use crate::web_server::WebServer; use crate::web_server::{create_web_server, run_web_server};
mod apache_metrics; mod apache_metrics;
mod log_file_pattern; mod log_file_pattern;
@ -53,13 +51,6 @@ fn find_log_files(environment_variable_name: &str, log_kind: &str) -> Option<Vec
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() -> ExitCode { async fn main() -> ExitCode {
let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1")); let host = env::var("HTTP_HOST").unwrap_or(String::from("127.0.0.1"));
let bind_ip = match IpAddr::from_str(&host) {
Ok(addr) => addr,
Err(_) => {
println!("Invalid HTTP host: {}", host);
return ExitCode::FAILURE;
}
};
println!("Initializing exporter..."); println!("Initializing exporter...");
@ -73,16 +64,11 @@ async fn main() -> ExitCode {
None => return ExitCode::FAILURE, None => return ExitCode::FAILURE,
}; };
let server = match WebServer::try_bind(SocketAddr::new(bind_ip, 9240)) {
Some(server) => server,
None => return ExitCode::FAILURE
};
let (metrics_registry, metrics) = ApacheMetrics::new(); let (metrics_registry, metrics) = ApacheMetrics::new();
let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel(); let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel();
tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone())); tokio::spawn(watch_logs_task(access_log_files, error_log_files, metrics.clone(), shutdown_send.clone()));
tokio::spawn(server.serve(Mutex::new(metrics_registry))); tokio::spawn(run_web_server(create_web_server(host.as_str(), 9240, Mutex::new(metrics_registry))));
drop(shutdown_send); drop(shutdown_send);

View File

@ -1,81 +1,55 @@
use std::fmt; use std::{fmt, str};
use std::net::SocketAddr; use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use hyper::{Body, Error, header, Method, Request, Response, Server, StatusCode}; use actix_web::{App, HttpResponse, HttpServer, Result, web};
use hyper::http::Result; use actix_web::dev::Server;
use hyper::server::Builder;
use hyper::server::conn::AddrIncoming;
use hyper::service::{make_service_fn, service_fn};
use prometheus_client::encoding::text::encode; use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry; use prometheus_client::registry::Registry;
const MAX_BUFFER_SIZE: usize = 1024 * 32; //noinspection HttpUrlsUsage
pub fn create_web_server(host: &str, port: u16, metrics_registry: Mutex<Registry>) -> Server {
let metrics_registry = web::Data::new(metrics_registry);
pub struct WebServer { let server = HttpServer::new(move || {
builder: Builder<AddrIncoming>, App::new()
} .app_data(metrics_registry.clone())
.service(web::resource("/metrics").route(web::get().to(metrics_handler)))
impl WebServer {
//noinspection HttpUrlsUsage
pub fn try_bind(addr: SocketAddr) -> Option<WebServer> {
println!("[WebServer] Starting web server on {0} with metrics endpoint: http://{0}/metrics", addr);
let builder = match Server::try_bind(&addr) {
Ok(builder) => builder,
Err(e) => {
println!("[WebServer] Could not bind to {}: {}", addr, e);
return None;
}
};
let builder = builder.tcp_keepalive(Some(Duration::from_secs(60)));
let builder = builder.http1_only(true);
let builder = builder.http1_keepalive(true);
let builder = builder.http1_max_buf_size(MAX_BUFFER_SIZE);
let builder = builder.http1_header_read_timeout(Duration::from_secs(10));
Some(WebServer { builder })
}
pub async fn serve(self, metrics_registry: Mutex<Registry>) {
let metrics_registry = Arc::new(metrics_registry);
let service = make_service_fn(move |_| {
let metrics_registry = Arc::clone(&metrics_registry);
async move {
Ok::<_, Error>(service_fn(move |req| handle_request(req, Arc::clone(&metrics_registry))))
}
}); });
if let Err(e) = self.builder.serve(service).await { let server = server.keep_alive(Duration::from_secs(60));
println!("[WebServer] Error starting web server: {}", e); let server = server.shutdown_timeout(0);
} let server = server.disable_signals();
} let server = server.workers(1);
let server = server.bind((host, port));
println!("[WebServer] Starting web server on {0}:{1} with metrics endpoint: http://{0}:{1}/metrics", host, port);
server.unwrap().run()
} }
async fn handle_request(req: Request<Body>, metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> { pub async fn run_web_server(server: Server) {
if req.method() == Method::GET && req.uri().path() == "/metrics" { if let Err(e) = server.await {
metrics_handler(Arc::clone(&metrics_registry)).await println!("[WebServer] Error running web server: {}", e);
} else {
Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty())
} }
} }
//noinspection SpellCheckingInspection //noinspection SpellCheckingInspection
async fn metrics_handler(metrics_registry: Arc<Mutex<Registry>>) -> Result<Response<Body>> { async fn metrics_handler(metrics_registry: web::Data<Mutex<Registry>>) -> Result<HttpResponse> {
match encode_metrics(metrics_registry) { let response = match encode_metrics(metrics_registry) {
MetricsEncodeResult::Ok(buf) => { MetricsEncodeResult::Ok(buf) => {
Response::builder().status(StatusCode::OK).header(header::CONTENT_TYPE, "application/openmetrics-text; version=1.0.0; charset=utf-8").body(Body::from(buf)) HttpResponse::Ok().content_type("application/openmetrics-text; version=1.0.0; charset=utf-8").body(buf)
} }
MetricsEncodeResult::FailedAcquiringRegistryLock => { MetricsEncodeResult::FailedAcquiringRegistryLock => {
println!("[WebServer] Failed acquiring lock on registry."); println!("[WebServer] Failed acquiring lock on registry.");
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty()) HttpResponse::InternalServerError().body("")
} }
MetricsEncodeResult::FailedEncodingMetrics(e) => { MetricsEncodeResult::FailedEncodingMetrics(e) => {
println!("[WebServer] Error encoding metrics: {}", e); println!("[WebServer] Error encoding metrics: {}", e);
Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty()) HttpResponse::InternalServerError().body("")
}
} }
};
Ok(response)
} }
enum MetricsEncodeResult { enum MetricsEncodeResult {
@ -84,12 +58,12 @@ enum MetricsEncodeResult {
FailedEncodingMetrics(fmt::Error), FailedEncodingMetrics(fmt::Error),
} }
fn encode_metrics(metrics_registry: Arc<Mutex<Registry>>) -> MetricsEncodeResult { fn encode_metrics(metrics_registry: web::Data<Mutex<Registry>>) -> MetricsEncodeResult {
let mut buf = String::new(); let mut buf = String::new();
return if let Ok(metrics_registry) = metrics_registry.lock() { return if let Ok(metrics_registry) = metrics_registry.lock() {
encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf)) encode(&mut buf, &metrics_registry).map_or_else(MetricsEncodeResult::FailedEncodingMetrics, |_| MetricsEncodeResult::Ok(buf))
} else { } else {
MetricsEncodeResult::FailedAcquiringRegistryLock MetricsEncodeResult::FailedAcquiringRegistryLock
}; }
} }