Skip to content

Commit

Permalink
Add machines gauge
Browse files Browse the repository at this point in the history
  • Loading branch information
vruello committed Nov 26, 2024
1 parent 3c968b8 commit 80652f4
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 11 deletions.
13 changes: 13 additions & 0 deletions common/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ pub struct Monitoring {
count_input_event_bytes_per_machine: Option<bool>,
count_http_request_body_network_size_per_machine: Option<bool>,
count_http_request_body_real_size_per_machine: Option<bool>,
machines_refresh_interval: Option<u64>,
}

impl Monitoring {
Expand Down Expand Up @@ -415,6 +416,10 @@ impl Monitoring {
self.count_http_request_body_real_size_per_machine
.unwrap_or(false)
}

pub fn machines_refresh_interval(&self) -> u64 {
self.machines_refresh_interval.unwrap_or(30)
}
}

#[derive(Debug, Deserialize, Clone)]
Expand Down Expand Up @@ -657,6 +662,9 @@ mod tests {
s.monitoring().unwrap().http_request_duration_buckets(),
&[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,]
);
assert_eq!(
s.monitoring().unwrap().machines_refresh_interval(), 30
);
}

const CONFIG_TLS_POSTGRES_WITH_CLI: &str = r#"
Expand Down Expand Up @@ -694,6 +702,7 @@ mod tests {
count_http_request_body_network_size_per_machine = true
count_http_request_body_real_size_per_machine = true
count_input_events_per_machine = true
machines_refresh_interval = 10
"#;

#[test]
Expand Down Expand Up @@ -730,6 +739,10 @@ mod tests {
s.monitoring().unwrap().http_request_duration_buckets(),
&[0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
);
assert_eq!(
s.monitoring().unwrap().machines_refresh_interval(),
10
);
}

const CONFIG_TLS_POSTGRES_WITH_OUTPUTS: &str = r#"
Expand Down
1 change: 1 addition & 0 deletions common/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ impl SubscriptionStatsCounters {
}
}

#[derive(IntoStaticStr)]
pub enum SubscriptionMachineState {
Alive,
Active,
Expand Down
7 changes: 4 additions & 3 deletions doc/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ Metrics collection and publication can be enabled in the OpenWEC settings (see `

### Available metrics

> [!CAUTION]
> Enabling the `machine` labels may cause a **huge** increase in metric cardinality! This is disabled by default.
| **Metric** | **Type** | **Labels** | **Description** |
|---|---|---|---|
| `openwec_input_events_total` | `Counter` | `subscription_uuid`, `subscription_name`, `machine` (optional*) | The total number of events received by openwec |
Expand All @@ -53,6 +56,4 @@ Metrics collection and publication can be enabled in the OpenWEC settings (see `
| `openwec_http_request_body_real_size_bytes_total` | `Counter` | `uri`, `machine` (optional*) | The total size of all http requests body received by openwec after decryption and decompression |
| `openwec_output_driver_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `driver` | The total number of output driver failures |
| `openwec_output_format_failures_total` | `Counter` | `subscription_uuid`, `subscription_name`, `format` | The total number of output format failures |

> [!WARNING]
> Enabling the `machine` labels may cause a **huge** increase in metric cardinality!
| `openwec_machines` | `Gauge` | `subscription_uuid`, `subscription_name`, `state` | The number of machines known by openwec |
4 changes: 4 additions & 0 deletions openwec.conf.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@
# Listen port of the Prometheus-compatible endpoint
# listen_port =

# [Optional]
# The refresh interval of "openwec_machines" gauge
# machines_refresh_interval = 30

# [Optional]
# Request duration buckets (in seconds) used by the "openwec_http_request_duration_seconds" histogram
# http_request_duration_buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
Expand Down
8 changes: 4 additions & 4 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,10 +1112,6 @@ pub async fn run(settings: Settings, verbosity: u8) {
panic!("Failed to setup logging: {:?}", e);
}

if let Some(monitoring_settings) = settings.monitoring() {
monitoring::init(monitoring_settings).expect("Failed to set metric exporter");
}

let rt_handle = Handle::current();

// Start monitoring thread
Expand All @@ -1137,6 +1133,10 @@ pub async fn run(settings: Settings, verbosity: u8) {

let subscriptions = Arc::new(RwLock::new(HashMap::new()));

if let Some(monitoring_settings) = settings.monitoring() {
monitoring::init(&db, subscriptions.clone(), monitoring_settings).expect("Failed to initialize metrics exporter");
}

let reload_interval = settings.server().db_sync_interval();
let outputs_settings = settings.outputs().clone();
let update_task_db = db.clone();
Expand Down
101 changes: 97 additions & 4 deletions server/src/monitoring.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::{
net::{IpAddr, SocketAddr},
str::FromStr,
time::{Duration, SystemTime},
};

use anyhow::Result;
use common::settings::Monitoring;
use log::info;
use metrics::{describe_counter, describe_histogram, Unit};
use common::{database::Db, settings::Monitoring, subscription::SubscriptionMachineState};
use log::{debug, info};
use metrics::{describe_counter, describe_gauge, describe_histogram, gauge, Unit};
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder};
use tokio::time;

use crate::subscription::Subscriptions;

// input metrics

Expand Down Expand Up @@ -46,7 +50,26 @@ pub const OUTPUT_DRIVER: &str = "driver";
pub const OUTPUT_FORMAT_FAILURES: &str = "openwec_output_format_failures_total";
pub const OUTPUT_FORMAT: &str = "format";

pub fn init(settings: &Monitoring) -> Result<()> {
// machines metrics

pub const MACHINES_GAUGE: &str = "openwec_machines";
pub const MACHINES_STATE: &str = "state";

pub fn init(db: &Db, subscriptions: Subscriptions, settings: &Monitoring) -> Result<()> {
let refresh_interval = settings.machines_refresh_interval();
let refresh_task_db = db.clone();
let refresh_task_subscriptions = subscriptions.clone();

// Launch a task responsible for refreshing machines gauge
tokio::spawn(async move {
refresh_machines_task(
refresh_task_db,
refresh_task_subscriptions,
refresh_interval,
)
.await
});

let addr = SocketAddr::from((
IpAddr::from_str(settings.listen_address())
.expect("Failed to parse monitoring.listen_address"),
Expand Down Expand Up @@ -120,5 +143,75 @@ pub fn init(settings: &Monitoring) -> Result<()> {
"The total number of output format failures"
);

// machines
describe_gauge!(
MACHINES_GAUGE,
Unit::Count,
"The number of machines known by openwec"
);

Ok(())
}

async fn refresh_machines_task(
db: Db,
subscriptions: Subscriptions,
refresh_interval: u64,
) -> Result<()> {
info!("Starting refresh machines task for monitoring");
let mut refresh = time::interval(Duration::from_secs(refresh_interval));
// We don't want the first tick to complete immediatly
refresh.reset_after(Duration::from_secs(refresh_interval));
loop {
tokio::select! {
_ = refresh.tick() => {
debug!("Refreshing machines stats for monitoring");

// We can't await with the lock on "subscriptions"
// So we first copy all data we need from "subscriptions"
let subscriptions_data = {
let subscriptions_unlocked = subscriptions.read().unwrap();
let mut subscriptions_data = Vec::with_capacity(subscriptions_unlocked.len());
for (_, subscription) in subscriptions.read().unwrap().iter() {
subscriptions_data.push((subscription.uuid_string(), subscription.data().name().to_string(), subscription.data().heartbeat_interval()));
}
subscriptions_data
};

for (subscription_uuid, subscription_name, heartbeat_interval) in subscriptions_data {
let now: i64 = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs()
.try_into()?;

let stats = db
.get_stats(&subscription_uuid, now - (heartbeat_interval as i64))
.await?;

debug!("Update {} values with active={}, alive={}, dead={}", MACHINES_GAUGE, stats.active_machines_count(), stats.alive_machines_count(), stats.dead_machines_count());

let alive_str: &'static str = SubscriptionMachineState::Alive.into();
gauge!(MACHINES_GAUGE,
SUBSCRIPTION_NAME => subscription_name.clone(),
SUBSCRIPTION_UUID => subscription_uuid.clone(),
MACHINES_STATE => alive_str)
.set(stats.alive_machines_count() as f64);

let active_str: &'static str = SubscriptionMachineState::Active.into();
gauge!(MACHINES_GAUGE,
SUBSCRIPTION_NAME => subscription_name.clone(),
SUBSCRIPTION_UUID => subscription_uuid.clone(),
MACHINES_STATE => active_str)
.set(stats.active_machines_count() as f64);

let dead_str: &'static str = SubscriptionMachineState::Dead.into();
gauge!(MACHINES_GAUGE,
SUBSCRIPTION_NAME => subscription_name.clone(),
SUBSCRIPTION_UUID => subscription_uuid.clone(),
MACHINES_STATE => dead_str)
.set(stats.dead_machines_count() as f64);
}
}
}
}
}

0 comments on commit 80652f4

Please sign in to comment.