Skip to content

Commit

Permalink
Prometheus metric ceremony (#4034)
Browse files Browse the repository at this point in the history
* added macro to create gauges that get deleted

* added ceremony_duration metric

* fixed gauge to handle convertion to i64

* ceremony missing messages on timeout metric added

* added chain label to CEREMONY_PROCESSED_MSG, CEREMONY_DURATION, CEREMONY_TIMEOUT_MISSING_MSG

* modified macro to support drop (deletion of labels) on all the types of wrapper, passed as a parameter when calling the macro

* added STAGE_DURATION metric

* use collect_array

* added STAGE_COMPLETING/STAGE_FAILING metrics

* avoid saving labels already seen (add to the hashset) if we don't drop the metric

* fixed missing imports caused by rebasing

* fix double imports

* avoid using format! and to_string every time -> use clone()

* fixed typo

* addressed PR comments

* use Option for stage/ceremony _start
do conversion inside the constructor
add test to check deletion of metrics inside CeremonyMetrics strucs

* fixed test

* cargo fmt

* added manual deletion inside tests and make sure it returns an error -> deletion done before as expected

* address review comments
  • Loading branch information
marcellorigotti authored and dandanlen committed Oct 9, 2023
1 parent 3c58b2f commit 946294f
Show file tree
Hide file tree
Showing 4 changed files with 460 additions and 87 deletions.
63 changes: 30 additions & 33 deletions engine/multisig/src/client/ceremony_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod tests;
use std::{
collections::{btree_map, BTreeMap, BTreeSet},
pin::Pin,
time::Duration,
time::{Duration, Instant},
};

use anyhow::Result;
Expand All @@ -15,13 +15,7 @@ use tokio::sync::{
oneshot,
};
use tracing::{debug, warn, Instrument};
use utilities::{
format_iterator,
metrics::{
CeremonyBadMsgNotDrop, CeremonyMetrics, CeremonyProcessedMsgDrop, CEREMONY_BAD_MSG,
CEREMONY_PROCESSED_MSG,
},
};
use utilities::{format_iterator, metrics::CeremonyMetrics};

use crate::{
client::{
Expand Down Expand Up @@ -86,7 +80,7 @@ where
// We always create unauthorised first, it can get promoted to
// an authorised one with a ceremony request
let mut runner = Self::new_unauthorised(outcome_sender, ceremony_id);

let mut ceremony_start: Option<Instant> = None;
// Fuse the oneshot future so it will not get called twice
let mut request_receiver = request_receiver.fuse();

Expand All @@ -102,7 +96,7 @@ where
request = &mut request_receiver => {

let PreparedRequest { initial_stage } = request.expect("Ceremony request channel was dropped unexpectedly");

ceremony_start = Some(Instant::now());
if let Some(result) = runner.on_ceremony_request(initial_stage).instrument(span.clone()).await {
break result;
}
Expand All @@ -116,7 +110,16 @@ where
}
}
};

if let Some(start_instant) = ceremony_start {
let duration = start_instant.elapsed().as_millis();
runner.metrics.ceremony_duration.set(duration);
tracing::info!(
"Ceremony {} ({}) took {}ms to complete",
Ceremony::CEREMONY_TYPE,
ceremony_id,
duration
);
}
let _result = runner.outcome_sender.send((ceremony_id, outcome));
Ok(())
}
Expand All @@ -135,13 +138,7 @@ where
timeout_handle: Box::pin(tokio::time::sleep(tokio::time::Duration::ZERO)),
outcome_sender,
_phantom: Default::default(),
metrics: CeremonyMetrics {
processed_messages: CeremonyProcessedMsgDrop::new(
&CEREMONY_PROCESSED_MSG,
[format!("{}", ceremony_id)],
),
bad_message: CeremonyBadMsgNotDrop::new(&CEREMONY_BAD_MSG, [Chain::NAME]),
},
metrics: CeremonyMetrics::new(ceremony_id, Chain::NAME, Ceremony::CEREMONY_TYPE),
}
}

Expand All @@ -151,7 +148,7 @@ where
&mut self,
mut initial_stage: DynStage<Ceremony>,
) -> OptionalCeremonyReturn<Ceremony> {
let single_party_result = initial_stage.init(&self.metrics);
let single_party_result = initial_stage.init(&mut self.metrics);

// This function is only ever called from a oneshot channel,
// so it should never get called twice.
Expand Down Expand Up @@ -180,14 +177,15 @@ where
.stage
.take()
.expect("Ceremony must be authorised to finalize any of its stages");

let stage_name = stage.get_stage_name().to_string();
let validator_mapping = stage.ceremony_common().validator_mapping.clone();

match stage.finalize().await {
match stage.finalize(&mut self.metrics).await {
StageResult::NextStage(mut next_stage) => {
debug!("Ceremony transitions to {}", next_stage.get_stage_name());
self.metrics.stage_completing.inc(&[&stage_name]);

let single_party_result = next_stage.init(&self.metrics);
let single_party_result = next_stage.init(&mut self.metrics);

self.stage = Some(next_stage);

Expand All @@ -208,10 +206,13 @@ where
self.process_delayed().await
}
},
StageResult::Error(bad_validators, reason) =>
Some(Err((validator_mapping.get_ids(bad_validators), reason))),
StageResult::Error(bad_validators, reason) => {
self.metrics.stage_failing.inc(&[&stage_name, &format!("{:?}", reason)]);
Some(Err((validator_mapping.get_ids(bad_validators), reason)))
},
StageResult::Done(result) => {
debug!("Ceremony reached the final stage!");
self.metrics.stage_completing.inc(&[&stage_name]);

Some(Ok(result))
},
Expand Down Expand Up @@ -281,7 +282,7 @@ where
}

if let ProcessMessageResult::Ready =
stage.process_message(sender_idx, data, &self.metrics)
stage.process_message(sender_idx, data, &mut self.metrics)
{
return self.finalize_current_stage().await
}
Expand Down Expand Up @@ -353,14 +354,10 @@ where
stage.get_stage_name(),
missing_messages_from_accounts.len()
);

warn!(
missing_ids = format_iterator(missing_messages_from_accounts).to_string(),
"Ceremony stage {} timed out before all messages collected ({} missing), trying to finalize current stage anyway.",
stage.get_stage_name(),
missing_messages_from_accounts.len()
);

let stage_name = stage.get_stage_name().to_string();
self.metrics
.missing_messages
.set(&[&stage_name], missing_messages_from_accounts.len());
self.finalize_current_stage().await
} else {
panic!("Unauthorised ceremonies cannot timeout");
Expand Down
30 changes: 21 additions & 9 deletions engine/multisig/src/client/common/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use std::{
collections::{btree_map, BTreeMap},
fmt::Display,
time::Instant,
};

use async_trait::async_trait;
use cf_primitives::{AuthorityCount, CeremonyId};
use tracing::warn;

use super::ceremony_stage::{CeremonyCommon, CeremonyStage, ProcessMessageResult, StageResult};
use crate::{
client::{ceremony_manager::CeremonyTrait, MultisigMessage},
p2p::{OutgoingMultisigStageMessages, ProtocolVersion, CURRENT_PROTOCOL_VERSION},
};

use super::ceremony_stage::{CeremonyCommon, CeremonyStage, ProcessMessageResult, StageResult};
use utilities::metrics::CeremonyMetrics;

pub use super::broadcast_verification::verify_broadcasts_non_blocking;
use utilities::metrics::CeremonyMetrics;

/// Used by individual stages to distinguish between
/// a public message that should be broadcast to everyone
Expand Down Expand Up @@ -61,14 +61,15 @@ where
/// Determines the actual computations before/after
/// the data is collected
processor: Stage,
stage_started: Option<Instant>,
}

impl<C: CeremonyTrait, Stage> BroadcastStage<C, Stage>
where
Stage: BroadcastStageProcessor<C>,
{
pub fn new(processor: Stage, common: CeremonyCommon) -> Self {
BroadcastStage { common, messages: BTreeMap::new(), processor }
BroadcastStage { common, messages: BTreeMap::new(), processor, stage_started: None }
}
}

Expand Down Expand Up @@ -99,9 +100,9 @@ impl<C: CeremonyTrait, Stage> CeremonyStage<C> for BroadcastStage<C, Stage>
where
Stage: BroadcastStageProcessor<C> + Send,
{
fn init(&mut self, metrics: &CeremonyMetrics) -> ProcessMessageResult {
fn init(&mut self, metrics: &mut CeremonyMetrics) -> ProcessMessageResult {
let common = &self.common;

self.stage_started = Some(Instant::now());
let idx_to_id = |idx: &AuthorityCount| common.validator_mapping.get_id(*idx).clone();

let (own_message, outgoing_messages) = match self.processor.init() {
Expand Down Expand Up @@ -158,7 +159,7 @@ where
&mut self,
signer_idx: AuthorityCount,
m: C::Data,
metrics: &CeremonyMetrics,
metrics: &mut CeremonyMetrics,
) -> ProcessMessageResult {
metrics.processed_messages.inc();
let m: Stage::Message = match m.try_into() {
Expand Down Expand Up @@ -209,11 +210,18 @@ where
}
}

async fn finalize(mut self: Box<Self>) -> StageResult<C> {
async fn finalize(mut self: Box<Self>, metrics: &mut CeremonyMetrics) -> StageResult<C> {
// Because we might want to finalize the stage before
// all data has been received (e.g. due to a timeout),
// we insert None for any missing data
let stage_name = self.get_stage_name().to_string();
if let Some(start_instant) = self.stage_started {
metrics
.stage_duration
.set(&[&stage_name, "receiving"], start_instant.elapsed().as_millis());
}

let process_msg_instant = Instant::now();
let mut received_messages = std::mem::take(&mut self.messages);

// Turns values T into Option<T>, inserting `None` where
Expand All @@ -225,7 +233,11 @@ where
.map(|idx| (*idx, received_messages.remove(idx)))
.collect();

self.processor.process(messages).await
let result = self.processor.process(messages).await;
metrics
.stage_duration
.set(&[&stage_name, "processing"], process_msg_instant.elapsed().as_millis());
result
}

fn awaited_parties(&self) -> std::collections::BTreeSet<AuthorityCount> {
Expand Down
15 changes: 7 additions & 8 deletions engine/multisig/src/client/common/ceremony_stage.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::{collections::BTreeSet, sync::Arc};

use async_trait::async_trait;
use cf_primitives::{AuthorityCount, CeremonyId};
use tokio::sync::mpsc::UnboundedSender;
use utilities::metrics::CeremonyMetrics;

use crate::{
client::{ceremony_manager::CeremonyTrait, utils::PartyIdxMapping},
crypto::Rng,
p2p::OutgoingMultisigStageMessages,
ChainSigning,
};
use async_trait::async_trait;
use cf_primitives::{AuthorityCount, CeremonyId};
use tokio::sync::mpsc::UnboundedSender;
use utilities::metrics::CeremonyMetrics;

/// Outcome of a given ceremony stage
pub enum StageResult<C: CeremonyTrait> {
Expand All @@ -36,7 +35,7 @@ pub enum ProcessMessageResult {
#[async_trait]
pub trait CeremonyStage<C: CeremonyTrait> {
/// Perform initial computation for this stage (and initiate communication with other parties)
fn init(&mut self, metrics: &CeremonyMetrics) -> ProcessMessageResult;
fn init(&mut self, metrics: &mut CeremonyMetrics) -> ProcessMessageResult;

/// Process message from signer at index `signer_idx`. Precondition: the signer is a valid
/// holder of the key and selected to participate in this ceremony (TODO: also check that
Expand All @@ -45,12 +44,12 @@ pub trait CeremonyStage<C: CeremonyTrait> {
&mut self,
signer_idx: AuthorityCount,
m: C::Data,
metrics: &CeremonyMetrics,
metrics: &mut CeremonyMetrics,
) -> ProcessMessageResult;

/// Verify data for this stage after it is received from all other parties,
/// either abort or proceed to the next stage based on the result
async fn finalize(self: Box<Self>) -> StageResult<C>;
async fn finalize(self: Box<Self>, metrics: &mut CeremonyMetrics) -> StageResult<C>;

/// Parties we haven't heard from for the current stage
fn awaited_parties(&self) -> BTreeSet<AuthorityCount>;
Expand Down
Loading

0 comments on commit 946294f

Please sign in to comment.