diff --git a/Cargo.lock b/Cargo.lock index ef58bca..1afc58a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,6 +461,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -476,6 +486,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -705,6 +730,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -888,13 +919,22 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ + "foldhash", +] + [[package]] name = "hashlink" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown", + "hashbrown 0.14.5", ] [[package]] @@ -1006,6 +1046,24 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-native-certs", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.6" @@ -1061,14 +1119,20 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.0", ] +[[package]] +name = "ipnet" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" + [[package]] name = "is_terminal_polyfill" version = "1.70.0" @@ -1285,6 +1349,51 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "metrics" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae428771d17306715c5091d446327d1cfdedc82185c65ba8423ab404e45bf10" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85b6f8152da6d7892ff1b7a1c0fa3f435e92b5918ad67035c3bb432111d9a29b" +dependencies = [ + "base64 0.22.1", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "indexmap", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15b482df36c13dd1869d73d14d28cd4855fbd6cfc32294bee109908a9f4a4ed7" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.15.0", + "metrics", + "quanta", + "sketches-ddsketch", +] + [[package]] name = "miette" version = "7.2.0" @@ -1472,6 +1581,12 @@ dependencies = [ "syn 2.0.70", ] +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "openssl-sys" version = "0.9.103" @@ -1584,6 +1699,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "postgres-openssl" version = "0.5.0" @@ -1676,6 +1797,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.36.0" @@ -1724,6 +1860,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "11.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rdkafka" version = "0.36.2" @@ -1914,6 +2059,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -1963,6 +2121,15 @@ dependencies = [ "sdd", ] +[[package]] +name = "schannel" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01227be5826fa0690321a2ba6c5cd57a19cf3f6a09e76973b58e61de6ab9d1c1" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1975,6 +2142,29 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b84345e4c9bd703274a082fb80caaa99b7612be48dfaa1dd9266577ec412309d" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.204" @@ -2089,6 +2279,8 @@ dependencies = [ "log", "log-mdc", "log4rs", + "metrics", + "metrics-exporter-prometheus", "mime", "ppp", "quick-xml", @@ -2160,6 +2352,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.9" diff --git a/common/src/settings.rs b/common/src/settings.rs index a3391cf..a48288b 100644 --- a/common/src/settings.rs +++ b/common/src/settings.rs @@ -322,7 +322,7 @@ impl Cli { #[serde(deny_unknown_fields)] pub struct FilesOutput { // Time after which an unused file descriptor is closed - files_descriptor_close_timeout: Option + files_descriptor_close_timeout: Option, } impl FilesOutput { @@ -351,7 +351,7 @@ pub struct Outputs { #[serde(default)] files: FilesOutput, #[serde(default)] - kafka: KafkaOutput + kafka: KafkaOutput, } impl Outputs { @@ -368,6 +368,33 @@ impl Outputs { } } +#[derive(Debug, Deserialize, Clone, Default)] +#[serde(deny_unknown_fields)] +pub struct Monitoring { + listen_address: String, + listen_port: u16, + http_requests_histogram_buckets: Option>, +} + +impl Monitoring { + pub fn listen_address(&self) -> &str { + &self.listen_address + } + + pub fn listen_port(&self) -> u16 { + self.listen_port + } + + pub fn http_requests_histogram_buckets(&self) -> &[f64] { + match &self.http_requests_histogram_buckets { + Some(bucket) => bucket, + None => &[ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, + ], + } + } +} + #[derive(Debug, Deserialize, Clone)] #[serde(deny_unknown_fields)] pub struct Settings { @@ -381,6 +408,8 @@ pub struct Settings { cli: Cli, #[serde(default)] outputs: Outputs, + #[serde(default)] + monitoring: Option, } impl std::str::FromStr for Settings { @@ -422,6 +451,10 @@ impl Settings { pub fn outputs(&self) -> &Outputs { &self.outputs } + + pub fn monitoring(&self) -> Option<&Monitoring> { + self.monitoring.as_ref() + } } #[cfg(test)] diff --git a/server/Cargo.toml b/server/Cargo.toml index 532022a..3740f63 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -52,3 +52,5 @@ ppp = "2.2.0" tokio-rustls = "0.26.0" strum = { version = "0.26.1", features = ["derive"] } leon = "3.0.1" +metrics = "0.24.0" +metrics-exporter-prometheus = { version = "0.16.0", features = ["http-listener"] } \ No newline at end of file diff --git a/server/src/lib.rs b/server/src/lib.rs index d691b5d..51496f4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -8,6 +8,7 @@ mod heartbeat; mod kerberos; mod logging; mod logic; +mod monitoring; mod multipart; mod output; mod proxy_protocol; @@ -38,6 +39,11 @@ use hyper_util::rt::TokioIo; use kerberos::AuthenticationError; use libgssapi::error::MajorFlags; use log::{debug, error, info, trace, warn}; +use metrics::histogram; +use monitoring::{ + HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, HTTP_REQUESTS_METHOD, HTTP_REQUESTS_STATUS, + HTTP_REQUESTS_URI, +}; use quick_xml::writer::Writer; use soap::Serializable; use socket2::{SockRef, TcpKeepalive}; @@ -379,14 +385,23 @@ fn log_response( principal: &str, conn_status: ConnectionStatus, ) { - let duration: f32 = start.elapsed().as_micros() as f32; + let duration = start.elapsed(); + + histogram!(HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, + HTTP_REQUESTS_METHOD => method.to_owned(), + HTTP_REQUESTS_STATUS => status.to_string(), + HTTP_REQUESTS_URI => uri.to_owned()) + .record(duration.as_secs_f64()); // MDC is thread related, so it should be safe to use it in a non-async // function. log_mdc::insert("http_status", status.as_str()); log_mdc::insert("http_method", method); log_mdc::insert("http_uri", uri); - log_mdc::insert("response_time", format!("{:.3}", duration / 1000.0)); + log_mdc::insert( + "response_time", + format!("{:.3}", duration.as_micros() / 1000), + ); log_mdc::insert("ip", addr.ip().to_string()); log_mdc::insert("port", addr.port().to_string()); log_mdc::insert("principal", principal); @@ -1041,6 +1056,10 @@ pub async fn run(settings: Settings, verbosity: u8) { panic!("Failed to setup logging: {:?}", e); } + if let Some(monitoring_settings) = settings.monitoring() { + monitoring::init(monitoring_settings).expect("Failed to set metric exporter"); + } + let rt_handle = Handle::current(); // Start monitoring thread diff --git a/server/src/logic.rs b/server/src/logic.rs index 2595c3d..ea88374 100644 --- a/server/src/logic.rs +++ b/server/src/logic.rs @@ -1,6 +1,9 @@ use crate::{ event::{EventData, EventMetadata}, heartbeat::{store_heartbeat, WriteHeartbeatMessage}, + monitoring::{ + EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME, EVENTS_SUBSCRIPTION_UUID, FAILED_EVENTS_COUNTER, MESSAGES_ACTION, MESSAGES_ACTION_ENUMERATE, MESSAGES_ACTION_EVENTS, MESSAGES_ACTION_HEARTBEAT, MESSAGES_COUNTER + }, output::get_formatter, soap::{ Body, Header, Message, OptionSetValue, Subscription as SoapSubscription, SubscriptionBody, @@ -17,6 +20,7 @@ use common::{ }; use hyper::http::status::StatusCode; use log::{debug, error, warn}; +use metrics::counter; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -217,6 +221,9 @@ async fn handle_enumerate( }); } + counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_ENUMERATE) + .increment(1); + Ok(Response::ok( ACTION_ENUMERATE_RESPONSE, Some(Body::EnumerateResponse(res_subscriptions)), @@ -283,6 +290,9 @@ async fn handle_heartbeat( ) .await .context("Failed to store heartbeat")?; + + counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_HEARTBEAT).increment(1); + Ok(Response::ok(ACTION_ACK, None)) } @@ -354,6 +364,14 @@ async fn handle_events( return Ok(Response::err(StatusCode::FORBIDDEN)); } + // Retrieve the public version sent by the client, not the one stored in memory + let public_version = if let Some(public_version) = message.header().version() { + public_version + } else { + warn!("Missing subscription version in message events"); + return Ok(Response::err(StatusCode::BAD_REQUEST)); + }; + debug!( "Received {} events from {}:{} ({}) for subscription {} ({})", events.len(), @@ -364,13 +382,9 @@ async fn handle_events( subscription.uuid_string() ); - // Retrieve the public version sent by the client, not the one stored in memory - let public_version = if let Some(public_version) = message.header().version() { - public_version - } else { - warn!("Missing subscription version in message events"); - return Ok(Response::err(StatusCode::BAD_REQUEST)); - }; + counter!(MESSAGES_COUNTER, MESSAGES_ACTION => MESSAGES_ACTION_EVENTS) + .increment(1); + counter!(EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()).increment(events.len().try_into()?); let metadata = Arc::new(EventMetadata::new( request_data.remote_addr(), @@ -457,6 +471,7 @@ async fn handle_events( } if !succeed { + counter!(FAILED_EVENTS_COUNTER, EVENTS_SUBSCRIPTION_NAME => subscription.data().name().to_owned(), EVENTS_SUBSCRIPTION_UUID => subscription.uuid_string()).increment(events.len().try_into()?); return Ok(Response::err(StatusCode::SERVICE_UNAVAILABLE)); } diff --git a/server/src/monitoring.rs b/server/src/monitoring.rs new file mode 100644 index 0000000..063d71c --- /dev/null +++ b/server/src/monitoring.rs @@ -0,0 +1,69 @@ +use std::{ + net::{IpAddr, SocketAddr}, + str::FromStr, +}; + +use anyhow::Result; +use common::settings::Monitoring; +use log::info; +use metrics::{describe_counter, describe_histogram, Unit}; +use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; + +pub const MESSAGES_COUNTER: &str = "openwec_message_total"; +pub const MESSAGES_ACTION: &str = "action"; +pub const MESSAGES_ACTION_HEARTBEAT: &str = "heartbeat"; +pub const MESSAGES_ACTION_EVENTS: &str = "events"; +pub const MESSAGES_ACTION_ENUMERATE: &str = "enumerate"; + +pub const EVENTS_COUNTER: &str = "openwec_event_received_total"; +pub const EVENTS_SUBSCRIPTION_UUID: &str = "subscription_uuid"; +pub const EVENTS_SUBSCRIPTION_NAME: &str = "subscription_name"; + +pub const FAILED_EVENTS_COUNTER: &str = "openwec_event_output_failure_total"; + +pub const HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM: &str = "http_request_duration_seconds"; +pub const HTTP_REQUESTS_METHOD: &str = "method"; +pub const HTTP_REQUESTS_URI: &str = "uri"; +pub const HTTP_REQUESTS_STATUS: &str = "status"; + +pub fn init(settings: &Monitoring) -> Result<()> { + let addr = SocketAddr::from(( + IpAddr::from_str(settings.listen_address()) + .expect("Failed to parse monitoring.listen_address"), + settings.listen_port(), + )); + + let builder = PrometheusBuilder::new() + .with_http_listener(addr) + .set_buckets_for_metric( + Matcher::Full(HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM.to_string()), + settings.http_requests_histogram_buckets(), + )?; + + info!("Starting monitoring server on {}", addr); + + builder.install()?; + + describe_counter!( + MESSAGES_COUNTER, + Unit::Count, + "Number of messages received by openwec" + ); + describe_counter!( + EVENTS_COUNTER, + Unit::Count, + "Number of events received by openwec" + ); + describe_counter!( + FAILED_EVENTS_COUNTER, + Unit::Count, + "Number of events that could not be written to outputs by openwec" + ); + describe_histogram!( + HTTP_REQUESTS_DURATION_SECONDS_HISTOGRAM, + Unit::Seconds, + "HTTP requests duration histogram" + ); + + Ok(()) +}