From a8ffe7cdefc8750a79e8a7ca2433bcca4122ac9f Mon Sep 17 00:00:00 2001 From: Marcello Date: Mon, 25 Sep 2023 11:49:17 +0200 Subject: [PATCH] Prometheus metric ceremony (#4034) * added macro to create gauges that get deleted * added ceremony_duration metric * fixed gauge to handle convertion to i64 * ceremony missing messages on timeout metric added * added chain label to CEREMONY_PROCESSED_MSG, CEREMONY_DURATION, CEREMONY_TIMEOUT_MISSING_MSG * modified macro to support drop (deletion of labels) on all the types of wrapper, passed as a parameter when calling the macro * added STAGE_DURATION metric * use collect_array * added STAGE_COMPLETING/STAGE_FAILING metrics * avoid saving labels already seen (add to the hashset) if we don't drop the metric * fixed missing imports caused by rebasing * fix double imports * avoid using format! and to_string every time -> use clone() * fixed typo * addressed PR comments * use Option for stage/ceremony _start do conversion inside the constructor add test to check deletion of metrics inside CeremonyMetrics strucs * fixed test * cargo fmt * added manual deletion inside tests and make sure it returns an error -> deletion done before as expected * address review comments --- engine/multisig/src/client/ceremony_runner.rs | 63 ++- .../multisig/src/client/common/broadcast.rs | 30 +- .../src/client/common/ceremony_stage.rs | 15 +- utilities/src/with_std/metrics.rs | 439 ++++++++++++++++-- 4 files changed, 460 insertions(+), 87 deletions(-) diff --git a/engine/multisig/src/client/ceremony_runner.rs b/engine/multisig/src/client/ceremony_runner.rs index 2ff2d0d3af7..d2ac89291ab 100644 --- a/engine/multisig/src/client/ceremony_runner.rs +++ b/engine/multisig/src/client/ceremony_runner.rs @@ -4,7 +4,7 @@ mod tests; use std::{ collections::{btree_map, BTreeMap, BTreeSet}, pin::Pin, - time::Duration, + time::{Duration, Instant}, }; use anyhow::Result; @@ -15,13 +15,7 @@ use tokio::sync::{ oneshot, }; use tracing::{debug, warn, Instrument}; -use utilities::{ - format_iterator, - metrics::{ - CeremonyBadMsgNotDrop, CeremonyMetrics, CeremonyProcessedMsgDrop, CEREMONY_BAD_MSG, - CEREMONY_PROCESSED_MSG, - }, -}; +use utilities::{format_iterator, metrics::CeremonyMetrics}; use crate::{ client::{ @@ -86,7 +80,7 @@ where // We always create unauthorised first, it can get promoted to // an authorised one with a ceremony request let mut runner = Self::new_unauthorised(outcome_sender, ceremony_id); - + let mut ceremony_start: Option = None; // Fuse the oneshot future so it will not get called twice let mut request_receiver = request_receiver.fuse(); @@ -102,7 +96,7 @@ where request = &mut request_receiver => { let PreparedRequest { initial_stage } = request.expect("Ceremony request channel was dropped unexpectedly"); - + ceremony_start = Some(Instant::now()); if let Some(result) = runner.on_ceremony_request(initial_stage).instrument(span.clone()).await { break result; } @@ -116,7 +110,16 @@ where } } }; - + if let Some(start_instant) = ceremony_start { + let duration = start_instant.elapsed().as_millis(); + runner.metrics.ceremony_duration.set(duration); + tracing::info!( + "Ceremony {} ({}) took {}ms to complete", + Ceremony::CEREMONY_TYPE, + ceremony_id, + duration + ); + } let _result = runner.outcome_sender.send((ceremony_id, outcome)); Ok(()) } @@ -135,13 +138,7 @@ where timeout_handle: Box::pin(tokio::time::sleep(tokio::time::Duration::ZERO)), outcome_sender, _phantom: Default::default(), - metrics: CeremonyMetrics { - processed_messages: CeremonyProcessedMsgDrop::new( - &CEREMONY_PROCESSED_MSG, - [format!("{}", ceremony_id)], - ), - bad_message: CeremonyBadMsgNotDrop::new(&CEREMONY_BAD_MSG, [Chain::NAME]), - }, + metrics: CeremonyMetrics::new(ceremony_id, Chain::NAME, Ceremony::CEREMONY_TYPE), } } @@ -151,7 +148,7 @@ where &mut self, mut initial_stage: DynStage, ) -> OptionalCeremonyReturn { - let single_party_result = initial_stage.init(&self.metrics); + let single_party_result = initial_stage.init(&mut self.metrics); // This function is only ever called from a oneshot channel, // so it should never get called twice. @@ -180,14 +177,15 @@ where .stage .take() .expect("Ceremony must be authorised to finalize any of its stages"); - + let stage_name = stage.get_stage_name().to_string(); let validator_mapping = stage.ceremony_common().validator_mapping.clone(); - match stage.finalize().await { + match stage.finalize(&mut self.metrics).await { StageResult::NextStage(mut next_stage) => { debug!("Ceremony transitions to {}", next_stage.get_stage_name()); + self.metrics.stage_completing.inc(&[&stage_name]); - let single_party_result = next_stage.init(&self.metrics); + let single_party_result = next_stage.init(&mut self.metrics); self.stage = Some(next_stage); @@ -208,10 +206,13 @@ where self.process_delayed().await } }, - StageResult::Error(bad_validators, reason) => - Some(Err((validator_mapping.get_ids(bad_validators), reason))), + StageResult::Error(bad_validators, reason) => { + self.metrics.stage_failing.inc(&[&stage_name, &format!("{:?}", reason)]); + Some(Err((validator_mapping.get_ids(bad_validators), reason))) + }, StageResult::Done(result) => { debug!("Ceremony reached the final stage!"); + self.metrics.stage_completing.inc(&[&stage_name]); Some(Ok(result)) }, @@ -281,7 +282,7 @@ where } if let ProcessMessageResult::Ready = - stage.process_message(sender_idx, data, &self.metrics) + stage.process_message(sender_idx, data, &mut self.metrics) { return self.finalize_current_stage().await } @@ -353,14 +354,10 @@ where stage.get_stage_name(), missing_messages_from_accounts.len() ); - - warn!( - missing_ids = format_iterator(missing_messages_from_accounts).to_string(), - "Ceremony stage {} timed out before all messages collected ({} missing), trying to finalize current stage anyway.", - stage.get_stage_name(), - missing_messages_from_accounts.len() - ); - + let stage_name = stage.get_stage_name().to_string(); + self.metrics + .missing_messages + .set(&[&stage_name], missing_messages_from_accounts.len()); self.finalize_current_stage().await } else { panic!("Unauthorised ceremonies cannot timeout"); diff --git a/engine/multisig/src/client/common/broadcast.rs b/engine/multisig/src/client/common/broadcast.rs index 86faafb527d..96efb51702a 100644 --- a/engine/multisig/src/client/common/broadcast.rs +++ b/engine/multisig/src/client/common/broadcast.rs @@ -1,21 +1,21 @@ use std::{ collections::{btree_map, BTreeMap}, fmt::Display, + time::Instant, }; use async_trait::async_trait; use cf_primitives::{AuthorityCount, CeremonyId}; use tracing::warn; +use super::ceremony_stage::{CeremonyCommon, CeremonyStage, ProcessMessageResult, StageResult}; use crate::{ client::{ceremony_manager::CeremonyTrait, MultisigMessage}, p2p::{OutgoingMultisigStageMessages, ProtocolVersion, CURRENT_PROTOCOL_VERSION}, }; - -use super::ceremony_stage::{CeremonyCommon, CeremonyStage, ProcessMessageResult, StageResult}; +use utilities::metrics::CeremonyMetrics; pub use super::broadcast_verification::verify_broadcasts_non_blocking; -use utilities::metrics::CeremonyMetrics; /// Used by individual stages to distinguish between /// a public message that should be broadcast to everyone @@ -61,6 +61,7 @@ where /// Determines the actual computations before/after /// the data is collected processor: Stage, + stage_started: Option, } impl BroadcastStage @@ -68,7 +69,7 @@ where Stage: BroadcastStageProcessor, { pub fn new(processor: Stage, common: CeremonyCommon) -> Self { - BroadcastStage { common, messages: BTreeMap::new(), processor } + BroadcastStage { common, messages: BTreeMap::new(), processor, stage_started: None } } } @@ -99,9 +100,9 @@ impl CeremonyStage for BroadcastStage where Stage: BroadcastStageProcessor + Send, { - fn init(&mut self, metrics: &CeremonyMetrics) -> ProcessMessageResult { + fn init(&mut self, metrics: &mut CeremonyMetrics) -> ProcessMessageResult { let common = &self.common; - + self.stage_started = Some(Instant::now()); let idx_to_id = |idx: &AuthorityCount| common.validator_mapping.get_id(*idx).clone(); let (own_message, outgoing_messages) = match self.processor.init() { @@ -158,7 +159,7 @@ where &mut self, signer_idx: AuthorityCount, m: C::Data, - metrics: &CeremonyMetrics, + metrics: &mut CeremonyMetrics, ) -> ProcessMessageResult { metrics.processed_messages.inc(); let m: Stage::Message = match m.try_into() { @@ -209,11 +210,18 @@ where } } - async fn finalize(mut self: Box) -> StageResult { + async fn finalize(mut self: Box, metrics: &mut CeremonyMetrics) -> StageResult { // Because we might want to finalize the stage before // all data has been received (e.g. due to a timeout), // we insert None for any missing data + let stage_name = self.get_stage_name().to_string(); + if let Some(start_instant) = self.stage_started { + metrics + .stage_duration + .set(&[&stage_name, "receiving"], start_instant.elapsed().as_millis()); + } + let process_msg_instant = Instant::now(); let mut received_messages = std::mem::take(&mut self.messages); // Turns values T into Option, inserting `None` where @@ -225,7 +233,11 @@ where .map(|idx| (*idx, received_messages.remove(idx))) .collect(); - self.processor.process(messages).await + let result = self.processor.process(messages).await; + metrics + .stage_duration + .set(&[&stage_name, "processing"], process_msg_instant.elapsed().as_millis()); + result } fn awaited_parties(&self) -> std::collections::BTreeSet { diff --git a/engine/multisig/src/client/common/ceremony_stage.rs b/engine/multisig/src/client/common/ceremony_stage.rs index 5b1e98eebfb..646e3d0c1cb 100644 --- a/engine/multisig/src/client/common/ceremony_stage.rs +++ b/engine/multisig/src/client/common/ceremony_stage.rs @@ -1,16 +1,15 @@ use std::{collections::BTreeSet, sync::Arc}; -use async_trait::async_trait; -use cf_primitives::{AuthorityCount, CeremonyId}; -use tokio::sync::mpsc::UnboundedSender; -use utilities::metrics::CeremonyMetrics; - use crate::{ client::{ceremony_manager::CeremonyTrait, utils::PartyIdxMapping}, crypto::Rng, p2p::OutgoingMultisigStageMessages, ChainSigning, }; +use async_trait::async_trait; +use cf_primitives::{AuthorityCount, CeremonyId}; +use tokio::sync::mpsc::UnboundedSender; +use utilities::metrics::CeremonyMetrics; /// Outcome of a given ceremony stage pub enum StageResult { @@ -36,7 +35,7 @@ pub enum ProcessMessageResult { #[async_trait] pub trait CeremonyStage { /// Perform initial computation for this stage (and initiate communication with other parties) - fn init(&mut self, metrics: &CeremonyMetrics) -> ProcessMessageResult; + fn init(&mut self, metrics: &mut CeremonyMetrics) -> ProcessMessageResult; /// Process message from signer at index `signer_idx`. Precondition: the signer is a valid /// holder of the key and selected to participate in this ceremony (TODO: also check that @@ -45,12 +44,12 @@ pub trait CeremonyStage { &mut self, signer_idx: AuthorityCount, m: C::Data, - metrics: &CeremonyMetrics, + metrics: &mut CeremonyMetrics, ) -> ProcessMessageResult; /// Verify data for this stage after it is received from all other parties, /// either abort or proceed to the next stage based on the result - async fn finalize(self: Box) -> StageResult; + async fn finalize(self: Box, metrics: &mut CeremonyMetrics) -> StageResult; /// Parties we haven't heard from for the current stage fn awaited_parties(&self) -> BTreeSet; diff --git a/utilities/src/with_std/metrics.rs b/utilities/src/with_std/metrics.rs index be7e1d8d138..ca461a8f8cd 100644 --- a/utilities/src/with_std/metrics.rs +++ b/utilities/src/with_std/metrics.rs @@ -3,6 +3,7 @@ //! Returns the metrics encoded in a prometheus format //! Method returns a Sender, allowing graceful termination of the infinite loop use super::{super::Port, task_scope}; +use crate::ArrayCollect; use async_channel::{unbounded, Receiver, Sender}; use lazy_static; use prometheus::{ @@ -11,7 +12,7 @@ use prometheus::{ IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, }; use serde::Deserialize; -use std::net::IpAddr; +use std::{collections::HashSet, net::IpAddr}; use tracing::info; use warp::Filter; @@ -162,32 +163,166 @@ macro_rules! build_counter_vec { } } } -/// The idea behind this macro is to help to create the wrapper for the metrics at compile time, -/// without having to specify number of labels etc, but still enforcing the correct use of the -/// metric there are 2 possibilities here: -/// - a metric with some const labels value -> these metrics get created specifying the const label -/// values and when used we need to specify the value for the other labels these metrics are kept -/// around even after being dropped, these type of metrics are used because -/// it simplify referring some values at runtime which wouldn't be available otherwise -/// - a metric with no const values -> these metrics are created with all the necessary labels -/// supplied, when interacting with them we don't have to specify any labels anymore when these -/// metrics go out of scope and get dropped the label combination is also deleted (we -/// won't refer to that specific combination ever again) + +macro_rules! build_gauge_vec_struct { + ($metric_ident:ident, $struct_ident:ident, $name:literal, $help:literal, $drop:expr, $labels:tt) => { + build_gauge_vec!($metric_ident, $name, $help, $labels); + + #[derive(Clone)] + pub struct $struct_ident { + metric: &'static $metric_ident, + labels: [String; { $labels.len() }], + drop: bool, + } + impl $struct_ident { + pub fn new( + metric: &'static $metric_ident, + labels: [String; { $labels.len() }], + ) -> $struct_ident { + $struct_ident { metric, labels, drop: $drop } + } + + pub fn inc(&self) { + let labels = self.labels.each_ref().map(|s| s.as_str()); + self.metric.inc(&labels); + } + + pub fn dec(&self) { + let labels = self.labels.each_ref().map(|s| s.as_str()); + self.metric.dec(&labels); + } + + pub fn set>(&self, val: T) + where + >::Error: std::fmt::Debug, + { + let labels = self.labels.each_ref().map(|s| s.as_str()); + self.metric.set(&labels, val); + } + } + impl Drop for $struct_ident { + fn drop(&mut self) { + if self.drop { + let metric = self.metric.prom_metric.clone(); + let labels: Vec = self.labels.to_vec(); + + DELETE_METRIC_CHANNEL + .0 + .try_send(DeleteMetricCommand::GaugePair(metric, labels)) + .expect("DELETE_METRIC_CHANNEL should never be closed!"); + } + } + } + }; + ($metric_ident:ident, $struct_ident:ident, $name:literal, $help:literal, $drop:expr, $labels:tt, $const_labels:tt) => { + build_gauge_vec!($metric_ident, $name, $help, $labels); + + #[derive(Clone)] + pub struct $struct_ident { + metric: &'static $metric_ident, + const_labels: [String; { $const_labels.len() }], + non_const_labels_used: HashSet<[String; { $labels.len() - $const_labels.len() }]>, + drop: bool, + } + impl $struct_ident { + pub fn new( + metric: &'static $metric_ident, + const_labels: [String; { $const_labels.len() }], + ) -> $struct_ident { + $struct_ident { + metric, + const_labels, + non_const_labels_used: HashSet::new(), + drop: $drop, + } + } + + pub fn inc( + &mut self, + non_const_labels: &[&str; { $labels.len() - $const_labels.len() }], + ) { + if self.drop { + self.non_const_labels_used.insert(non_const_labels.map(|s| s.to_string())); + } + let labels: [&str; { $labels.len() }] = self + .const_labels + .iter() + .map(|s| s.as_str()) + .chain(*non_const_labels) + .collect_array(); + self.metric.inc(&labels); + } + + pub fn dec( + &mut self, + non_const_labels: &[&str; { $labels.len() - $const_labels.len() }], + ) { + if self.drop { + self.non_const_labels_used.insert(non_const_labels.map(|s| s.to_string())); + } + let labels: [&str; { $labels.len() }] = self + .const_labels + .iter() + .map(|s| s.as_str()) + .chain(*non_const_labels) + .collect_array(); + self.metric.dec(&labels); + } + + pub fn set>( + &mut self, + non_const_labels: &[&str; { $labels.len() - $const_labels.len() }], + val: T, + ) where + >::Error: std::fmt::Debug, + { + if self.drop { + self.non_const_labels_used.insert(non_const_labels.map(|s| s.to_string())); + } + let labels: [&str; { $labels.len() }] = self + .const_labels + .iter() + .map(|s| s.as_str()) + .chain(*non_const_labels) + .collect_array(); + self.metric.set(&labels, val); + } + } + impl Drop for $struct_ident { + fn drop(&mut self) { + if self.drop { + let metric = self.metric.prom_metric.clone(); + let labels: Vec = self.const_labels.to_vec(); + for non_const_labels in self.non_const_labels_used.drain() { + let mut final_labels = labels.clone(); + final_labels.append(&mut non_const_labels.to_vec()); + DELETE_METRIC_CHANNEL + .0 + .try_send(DeleteMetricCommand::GaugePair(metric.clone(), final_labels)) + .expect("DELETE_METRIC_CHANNEL should never be closed!"); + } + } + } + } + }; +} + macro_rules! build_counter_vec_struct { - ($metric_ident:ident, $struct_ident:ident, $name:literal, $help:literal, $labels:tt) => { + ($metric_ident:ident, $struct_ident:ident, $name:literal, $help:literal, $drop:expr, $labels:tt) => { build_counter_vec!($metric_ident, $name, $help, $labels); #[derive(Clone)] pub struct $struct_ident { metric: &'static $metric_ident, labels: [String; { $labels.len() }], + drop: bool, } impl $struct_ident { pub fn new( metric: &'static $metric_ident, labels: [String; { $labels.len() }], ) -> $struct_ident { - $struct_ident { metric, labels } + $struct_ident { metric, labels, drop: $drop } } pub fn inc(&self) { @@ -197,43 +332,76 @@ macro_rules! build_counter_vec_struct { } impl Drop for $struct_ident { fn drop(&mut self) { - let metric = self.metric.prom_metric.clone(); - let labels: Vec = self.labels.iter().map(|s| s.to_string()).collect(); - - DELETE_METRIC_CHANNEL - .0 - .try_send(DeleteMetricCommand::CounterPair(metric, labels)) - .expect("DELETE_METRIC_CHANNEL should never be closed!"); + if self.drop { + let metric = self.metric.prom_metric.clone(); + let labels: Vec = self.labels.to_vec(); + + DELETE_METRIC_CHANNEL + .0 + .try_send(DeleteMetricCommand::CounterPair(metric, labels)) + .expect("DELETE_METRIC_CHANNEL should never be closed!"); + } } } }; - ($metric_ident:ident, $structNotDrop:ident, $name:literal, $help:literal, $labels:tt, $const_labels:tt) => { + ($metric_ident:ident, $struct_ident:ident, $name:literal, $help:literal, $drop:expr, $labels:tt, $const_labels:tt) => { build_counter_vec!($metric_ident, $name, $help, $labels); #[derive(Clone)] - pub struct $structNotDrop { + pub struct $struct_ident { metric: &'static $metric_ident, - const_labels: [&'static str; { $const_labels.len() }], + const_labels: [String; { $const_labels.len() }], + non_const_labels_used: HashSet<[String; { $labels.len() - $const_labels.len() }]>, + drop: bool, } - impl $structNotDrop { + impl $struct_ident { pub fn new( metric: &'static $metric_ident, - const_labels: [&'static str; { $const_labels.len() }], - ) -> $structNotDrop { - $structNotDrop { metric, const_labels } + const_labels: [String; { $const_labels.len() }], + ) -> $struct_ident { + $struct_ident { + metric, + const_labels, + drop: $drop, + non_const_labels_used: HashSet::new(), + } } - pub fn inc(&self, non_const_labels: &[&str; { $labels.len() - $const_labels.len() }]) { - let labels: [&str; { $labels.len() }] = { - let mut whole: [&str; { $labels.len() }] = [""; { $labels.len() }]; - let (one, two) = whole.split_at_mut(self.const_labels.len()); - one.copy_from_slice(&self.const_labels); - two.copy_from_slice(non_const_labels); - whole - }; + pub fn inc( + &mut self, + non_const_labels: &[&str; { $labels.len() - $const_labels.len() }], + ) { + if self.drop { + self.non_const_labels_used.insert(non_const_labels.map(|s| s.to_string())); + } + let labels: [&str; { $labels.len() }] = self + .const_labels + .iter() + .map(|s| s.as_str()) + .chain(*non_const_labels) + .collect_array(); self.metric.inc(&labels); } } + impl Drop for $struct_ident { + fn drop(&mut self) { + if self.drop { + let metric = self.metric.prom_metric.clone(); + let labels: Vec = self.const_labels.to_vec(); + for non_const_labels in self.non_const_labels_used.drain() { + let mut final_labels = labels.clone(); + final_labels.append(&mut non_const_labels.to_vec()); + DELETE_METRIC_CHANNEL + .0 + .try_send(DeleteMetricCommand::CounterPair( + metric.clone(), + final_labels, + )) + .expect("DELETE_METRIC_CHANNEL should never be closed!"); + } + } + } + } }; } @@ -284,22 +452,101 @@ build_counter_vec_struct!( CeremonyProcessedMsgDrop, "ceremony_msg", "Count all the processed messages for a given ceremony", - ["ceremony_id"] + true, + ["chain", "ceremony_id", "ceremony_type"] ); build_counter_vec_struct!( CEREMONY_BAD_MSG, CeremonyBadMsgNotDrop, "ceremony_bad_msg", "Count all the bad msgs processed during a ceremony", + false, ["chain", "reason"], ["chain"] //const labels ); +build_gauge_vec_struct!( + CEREMONY_DURATION, + CeremonyDurationDrop, + "ceremony_duration", + "Measure the duration of a ceremony in ms", + true, + ["chain", "ceremony_id", "ceremony_type"] +); +build_gauge_vec_struct!( + CEREMONY_TIMEOUT_MISSING_MSG, + CeremonyTimeoutMissingMsgDrop, + "ceremony_timeout_missing_msg", + "Measure the number of missing messages when reaching timeout", + true, + ["chain", "ceremony_id", "ceremony_type", "stage"], + ["chain", "ceremony_id", "ceremony_type"] +); +build_gauge_vec_struct!( + STAGE_DURATION, + StageDurationDrop, + "stage_duration", + "Measure the duration of a stage in ms", + true, + ["chain", "ceremony_id", "stage", "phase"], //phase can be either receiving or processing + ["chain", "ceremony_id"] +); +build_counter_vec_struct!( + STAGE_FAILING, + StageFailingNotDrop, + "stage_failing", + "Count the number of stages which are failing with the cause of the failure attached", + false, + ["chain", "stage", "reason"], + ["chain"] +); +build_counter_vec_struct!( + STAGE_COMPLETING, + StageCompletingNotDrop, + "stage_completing", + "Count the number of stages which are completing successfully", + false, + ["chain", "stage"], + ["chain"] +); /// structure containing the metrics used during a ceremony #[derive(Clone)] pub struct CeremonyMetrics { pub processed_messages: CeremonyProcessedMsgDrop, pub bad_message: CeremonyBadMsgNotDrop, + pub ceremony_duration: CeremonyDurationDrop, + pub missing_messages: CeremonyTimeoutMissingMsgDrop, + pub stage_duration: StageDurationDrop, + pub stage_failing: StageFailingNotDrop, + pub stage_completing: StageCompletingNotDrop, +} +impl CeremonyMetrics { + pub fn new(ceremony_id: u64, chain_name: &str, ceremony_type: &str) -> Self { + let ceremony_id = ceremony_id.to_string(); + let chain_name = chain_name.to_string(); + let ceremony_type = ceremony_type.to_string(); + CeremonyMetrics { + processed_messages: CeremonyProcessedMsgDrop::new( + &CEREMONY_PROCESSED_MSG, + [chain_name.clone(), ceremony_id.clone(), ceremony_type.clone()], + ), + bad_message: CeremonyBadMsgNotDrop::new(&CEREMONY_BAD_MSG, [chain_name.clone()]), + ceremony_duration: CeremonyDurationDrop::new( + &CEREMONY_DURATION, + [chain_name.clone(), ceremony_id.clone(), ceremony_type.clone()], + ), + missing_messages: CeremonyTimeoutMissingMsgDrop::new( + &CEREMONY_TIMEOUT_MISSING_MSG, + [chain_name.clone(), ceremony_id.clone(), ceremony_type], + ), + stage_duration: StageDurationDrop::new( + &STAGE_DURATION, + [chain_name.clone(), ceremony_id], + ), + stage_failing: StageFailingNotDrop::new(&STAGE_FAILING, [chain_name.clone()]), + stage_completing: StageCompletingNotDrop::new(&STAGE_COMPLETING, [chain_name]), + } + } } #[tracing::instrument(name = "prometheus-metric", skip_all)] @@ -405,6 +652,101 @@ mod test { request_test("metrics", reqwest::StatusCode::OK, "# HELP test test help\n# TYPE test counter\ntest{label=\"B\"} 10\ntest{label=\"C\"} 100\n").await; request_test("metrics", reqwest::StatusCode::OK, "# HELP test test help\n# TYPE test counter\ntest{label=\"B\"} 10\n").await; + REGISTRY.unregister(Box::new(metric)).unwrap(); + request_test("metrics", reqwest::StatusCode::OK, "").await; + + + //test CeremonyMetrics correct deletion + + //we create the ceremony struct and put some metrics in it + { + let mut metrics = CeremonyMetrics::new(7, "Chain1", "Keygen"); + metrics.bad_message.inc(&["AA"]); + metrics.ceremony_duration.set(999); + metrics.missing_messages.set(&["stage1",], 5); + metrics.processed_messages.inc(); + metrics.processed_messages.inc(); + metrics.stage_completing.inc(&["stage1"]); + metrics.stage_completing.inc(&["stage1"]); + metrics.stage_completing.inc(&["stage2"]); + metrics.stage_duration.set(&["stage1", "receiving"], 780); + metrics.stage_duration.set(&["stage1", "processing"], 78); + metrics.stage_failing.inc(&["stage3", "NotEnoughMessages"]); + + //This request does nothing, the ceremony is still ongoning so there is no deletion + request_test("metrics", reqwest::StatusCode::OK, +r#"# HELP ceremony_bad_msg Count all the bad msgs processed during a ceremony +# TYPE ceremony_bad_msg counter +ceremony_bad_msg{chain="Chain1",reason="AA"} 1 +# HELP ceremony_duration Measure the duration of a ceremony in ms +# TYPE ceremony_duration gauge +ceremony_duration{ceremony_id="7",ceremony_type="Keygen",chain="Chain1"} 999 +# HELP ceremony_msg Count all the processed messages for a given ceremony +# TYPE ceremony_msg counter +ceremony_msg{ceremony_id="7",ceremony_type="Keygen",chain="Chain1"} 2 +# HELP ceremony_timeout_missing_msg Measure the number of missing messages when reaching timeout +# TYPE ceremony_timeout_missing_msg gauge +ceremony_timeout_missing_msg{ceremony_id="7",ceremony_type="Keygen",chain="Chain1",stage="stage1"} 5 +# HELP stage_completing Count the number of stages which are completing successfully +# TYPE stage_completing counter +stage_completing{chain="Chain1",stage="stage1"} 2 +stage_completing{chain="Chain1",stage="stage2"} 1 +# HELP stage_duration Measure the duration of a stage in ms +# TYPE stage_duration gauge +stage_duration{ceremony_id="7",chain="Chain1",phase="processing",stage="stage1"} 78 +stage_duration{ceremony_id="7",chain="Chain1",phase="receiving",stage="stage1"} 780 +# HELP stage_failing Count the number of stages which are failing with the cause of the failure attached +# TYPE stage_failing counter +stage_failing{chain="Chain1",reason="NotEnoughMessages",stage="stage3"} 1 +"#).await; + + //End of ceremony + //struct gets dropped + } + + //First request after the ceremony ended we get all the metrics (same as the request above), and after we delete the ones that have no more reason to exists + request_test("metrics", reqwest::StatusCode::OK, +r#"# HELP ceremony_bad_msg Count all the bad msgs processed during a ceremony +# TYPE ceremony_bad_msg counter +ceremony_bad_msg{chain="Chain1",reason="AA"} 1 +# HELP ceremony_duration Measure the duration of a ceremony in ms +# TYPE ceremony_duration gauge +ceremony_duration{ceremony_id="7",ceremony_type="Keygen",chain="Chain1"} 999 +# HELP ceremony_msg Count all the processed messages for a given ceremony +# TYPE ceremony_msg counter +ceremony_msg{ceremony_id="7",ceremony_type="Keygen",chain="Chain1"} 2 +# HELP ceremony_timeout_missing_msg Measure the number of missing messages when reaching timeout +# TYPE ceremony_timeout_missing_msg gauge +ceremony_timeout_missing_msg{ceremony_id="7",ceremony_type="Keygen",chain="Chain1",stage="stage1"} 5 +# HELP stage_completing Count the number of stages which are completing successfully +# TYPE stage_completing counter +stage_completing{chain="Chain1",stage="stage1"} 2 +stage_completing{chain="Chain1",stage="stage2"} 1 +# HELP stage_duration Measure the duration of a stage in ms +# TYPE stage_duration gauge +stage_duration{ceremony_id="7",chain="Chain1",phase="processing",stage="stage1"} 78 +stage_duration{ceremony_id="7",chain="Chain1",phase="receiving",stage="stage1"} 780 +# HELP stage_failing Count the number of stages which are failing with the cause of the failure attached +# TYPE stage_failing counter +stage_failing{chain="Chain1",reason="NotEnoughMessages",stage="stage3"} 1 +"#).await; + + //Second request we get only the metrics which don't depend on a specific label like ceremony_id + request_test("metrics", reqwest::StatusCode::OK, +r#"# HELP ceremony_bad_msg Count all the bad msgs processed during a ceremony +# TYPE ceremony_bad_msg counter +ceremony_bad_msg{chain="Chain1",reason="AA"} 1 +# HELP stage_completing Count the number of stages which are completing successfully +# TYPE stage_completing counter +stage_completing{chain="Chain1",stage="stage1"} 2 +stage_completing{chain="Chain1",stage="stage2"} 1 +# HELP stage_failing Count the number of stages which are failing with the cause of the failure attached +# TYPE stage_failing counter +stage_failing{chain="Chain1",reason="NotEnoughMessages",stage="stage3"} 1 +"#).await; + + check_deleted_metrics(); + Ok(()) } .boxed() @@ -430,4 +772,27 @@ mod test { metric } + + fn check_deleted_metrics() { + assert!(STAGE_DURATION + .prom_metric + .remove_label_values(&["Chain1", "7", "stage1", "receiving"]) + .is_err()); + assert!(STAGE_DURATION + .prom_metric + .remove_label_values(&["Chain1", "7", "stage1", "processing"]) + .is_err()); + assert!(CEREMONY_TIMEOUT_MISSING_MSG + .prom_metric + .remove_label_values(&["Chain1", "7", "Keygen", "stage1"]) + .is_err()); + assert!(CEREMONY_DURATION + .prom_metric + .remove_label_values(&["Chain1", "7", "Keygen"]) + .is_err()); + assert!(CEREMONY_PROCESSED_MSG + .prom_metric + .remove_label_values(&["Chain1", "7", "Keygen"]) + .is_err()); + } }