Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus metric ceremony #4034

Merged
merged 20 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
cccc380
added macro to create gauges that get deleted
marcellorigotti Sep 18, 2023
8226df4
added ceremony_duration metric
marcellorigotti Sep 18, 2023
818d4a5
fixed gauge to handle convertion to i64
marcellorigotti Sep 18, 2023
683cde8
ceremony missing messages on timeout metric added
marcellorigotti Sep 18, 2023
5bedc8c
added chain label to CEREMONY_PROCESSED_MSG, CEREMONY_DURATION, CEREM…
marcellorigotti Sep 18, 2023
97d849d
modified macro to support drop (deletion of labels) on all the types …
marcellorigotti Sep 19, 2023
ad9b386
added STAGE_DURATION metric
marcellorigotti Sep 19, 2023
1153b74
use collect_array
marcellorigotti Sep 19, 2023
84181dc
added STAGE_COMPLETING/STAGE_FAILING metrics
marcellorigotti Sep 20, 2023
205828a
avoid saving labels already seen (add to the hashset) if we don't dro…
marcellorigotti Sep 20, 2023
5083735
fixed missing imports caused by rebasing
marcellorigotti Sep 20, 2023
ab400b1
fix double imports
marcellorigotti Sep 20, 2023
f884ccf
avoid using format! and to_string every time -> use clone()
marcellorigotti Sep 20, 2023
823f27b
fixed typo
marcellorigotti Sep 20, 2023
7e90d22
addressed PR comments
marcellorigotti Sep 21, 2023
b082d8f
use Option for stage/ceremony _start
marcellorigotti Sep 22, 2023
1b04fe1
fixed test
marcellorigotti Sep 22, 2023
c73b55b
cargo fmt
marcellorigotti Sep 22, 2023
4a82892
added manual deletion inside tests and make sure it returns an error …
marcellorigotti Sep 22, 2023
d32ddf1
address review comments
marcellorigotti Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 30 additions & 33 deletions engine/multisig/src/client/ceremony_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod tests;
use std::{
collections::{btree_map, BTreeMap, BTreeSet},
pin::Pin,
time::Duration,
time::{Duration, Instant},
};

use anyhow::Result;
Expand All @@ -15,13 +15,7 @@ use tokio::sync::{
oneshot,
};
use tracing::{debug, warn, Instrument};
use utilities::{
format_iterator,
metrics::{
CeremonyBadMsgNotDrop, CeremonyMetrics, CeremonyProcessedMsgDrop, CEREMONY_BAD_MSG,
CEREMONY_PROCESSED_MSG,
},
};
use utilities::{format_iterator, metrics::CeremonyMetrics};

use crate::{
client::{
Expand Down Expand Up @@ -86,7 +80,7 @@ where
// We always create unauthorised first, it can get promoted to
// an authorised one with a ceremony request
let mut runner = Self::new_unauthorised(outcome_sender, ceremony_id);

let mut ceremony_start: Option<Instant> = None;
// Fuse the oneshot future so it will not get called twice
let mut request_receiver = request_receiver.fuse();

Expand All @@ -102,7 +96,7 @@ where
request = &mut request_receiver => {

let PreparedRequest { initial_stage } = request.expect("Ceremony request channel was dropped unexpectedly");

ceremony_start = Some(Instant::now());
if let Some(result) = runner.on_ceremony_request(initial_stage).instrument(span.clone()).await {
break result;
}
Expand All @@ -116,7 +110,16 @@ where
}
}
};

if let Some(start_instant) = ceremony_start {
let duration = start_instant.elapsed().as_millis();
runner.metrics.ceremony_duration.set(duration);
tracing::info!(
"Ceremony {} ({}) took {}ms to complete",
Ceremony::CEREMONY_TYPE,
ceremony_id,
duration
);
}
let _result = runner.outcome_sender.send((ceremony_id, outcome));
Ok(())
}
Expand All @@ -135,13 +138,7 @@ where
timeout_handle: Box::pin(tokio::time::sleep(tokio::time::Duration::ZERO)),
outcome_sender,
_phantom: Default::default(),
metrics: CeremonyMetrics {
processed_messages: CeremonyProcessedMsgDrop::new(
&CEREMONY_PROCESSED_MSG,
[format!("{}", ceremony_id)],
),
bad_message: CeremonyBadMsgNotDrop::new(&CEREMONY_BAD_MSG, [Chain::NAME]),
},
metrics: CeremonyMetrics::new(ceremony_id, Chain::NAME, Ceremony::CEREMONY_TYPE),
}
}

Expand All @@ -151,7 +148,7 @@ where
&mut self,
mut initial_stage: DynStage<Ceremony>,
) -> OptionalCeremonyReturn<Ceremony> {
let single_party_result = initial_stage.init(&self.metrics);
let single_party_result = initial_stage.init(&mut self.metrics);

// This function is only ever called from a oneshot channel,
// so it should never get called twice.
Expand Down Expand Up @@ -180,14 +177,15 @@ where
.stage
.take()
.expect("Ceremony must be authorised to finalize any of its stages");

let stage_name = stage.get_stage_name().to_string();
let validator_mapping = stage.ceremony_common().validator_mapping.clone();

match stage.finalize().await {
match stage.finalize(&mut self.metrics).await {
StageResult::NextStage(mut next_stage) => {
debug!("Ceremony transitions to {}", next_stage.get_stage_name());
self.metrics.stage_completing.inc(&[&stage_name]);

let single_party_result = next_stage.init(&self.metrics);
let single_party_result = next_stage.init(&mut self.metrics);

self.stage = Some(next_stage);

Expand All @@ -208,10 +206,13 @@ where
self.process_delayed().await
}
},
StageResult::Error(bad_validators, reason) =>
Some(Err((validator_mapping.get_ids(bad_validators), reason))),
StageResult::Error(bad_validators, reason) => {
self.metrics.stage_failing.inc(&[&stage_name, &format!("{:?}", reason)]);
Some(Err((validator_mapping.get_ids(bad_validators), reason)))
},
StageResult::Done(result) => {
debug!("Ceremony reached the final stage!");
self.metrics.stage_completing.inc(&[&stage_name]);

Some(Ok(result))
},
Expand Down Expand Up @@ -281,7 +282,7 @@ where
}

if let ProcessMessageResult::Ready =
stage.process_message(sender_idx, data, &self.metrics)
stage.process_message(sender_idx, data, &mut self.metrics)
{
return self.finalize_current_stage().await
}
Expand Down Expand Up @@ -353,14 +354,10 @@ where
stage.get_stage_name(),
missing_messages_from_accounts.len()
);

warn!(
missing_ids = format_iterator(missing_messages_from_accounts).to_string(),
"Ceremony stage {} timed out before all messages collected ({} missing), trying to finalize current stage anyway.",
stage.get_stage_name(),
missing_messages_from_accounts.len()
);

let stage_name = stage.get_stage_name().to_string();
self.metrics
.missing_messages
.set(&[&stage_name], missing_messages_from_accounts.len());
self.finalize_current_stage().await
} else {
panic!("Unauthorised ceremonies cannot timeout");
Expand Down
30 changes: 21 additions & 9 deletions engine/multisig/src/client/common/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use std::{
collections::{btree_map, BTreeMap},
fmt::Display,
time::Instant,
};

use async_trait::async_trait;
use cf_primitives::{AuthorityCount, CeremonyId};
use tracing::warn;

use super::ceremony_stage::{CeremonyCommon, CeremonyStage, ProcessMessageResult, StageResult};
use crate::{
client::{ceremony_manager::CeremonyTrait, MultisigMessage},
p2p::{OutgoingMultisigStageMessages, ProtocolVersion, CURRENT_PROTOCOL_VERSION},
};

use super::ceremony_stage::{CeremonyCommon, CeremonyStage, ProcessMessageResult, StageResult};
use utilities::metrics::CeremonyMetrics;

pub use super::broadcast_verification::verify_broadcasts_non_blocking;
use utilities::metrics::CeremonyMetrics;

/// Used by individual stages to distinguish between
/// a public message that should be broadcast to everyone
Expand Down Expand Up @@ -61,14 +61,15 @@ where
/// Determines the actual computations before/after
/// the data is collected
processor: Stage,
stage_started: Option<Instant>,
}

impl<C: CeremonyTrait, Stage> BroadcastStage<C, Stage>
where
Stage: BroadcastStageProcessor<C>,
{
pub fn new(processor: Stage, common: CeremonyCommon) -> Self {
BroadcastStage { common, messages: BTreeMap::new(), processor }
BroadcastStage { common, messages: BTreeMap::new(), processor, stage_started: None }
}
}

Expand Down Expand Up @@ -99,9 +100,9 @@ impl<C: CeremonyTrait, Stage> CeremonyStage<C> for BroadcastStage<C, Stage>
where
Stage: BroadcastStageProcessor<C> + Send,
{
fn init(&mut self, metrics: &CeremonyMetrics) -> ProcessMessageResult {
fn init(&mut self, metrics: &mut CeremonyMetrics) -> ProcessMessageResult {
let common = &self.common;

self.stage_started = Some(Instant::now());
let idx_to_id = |idx: &AuthorityCount| common.validator_mapping.get_id(*idx).clone();

let (own_message, outgoing_messages) = match self.processor.init() {
Expand Down Expand Up @@ -158,7 +159,7 @@ where
&mut self,
signer_idx: AuthorityCount,
m: C::Data,
metrics: &CeremonyMetrics,
metrics: &mut CeremonyMetrics,
) -> ProcessMessageResult {
metrics.processed_messages.inc();
let m: Stage::Message = match m.try_into() {
Expand Down Expand Up @@ -209,11 +210,18 @@ where
}
}

async fn finalize(mut self: Box<Self>) -> StageResult<C> {
async fn finalize(mut self: Box<Self>, metrics: &mut CeremonyMetrics) -> StageResult<C> {
// Because we might want to finalize the stage before
// all data has been received (e.g. due to a timeout),
// we insert None for any missing data
let stage_name = self.get_stage_name().to_string();
if let Some(start_instant) = self.stage_started {
metrics
.stage_duration
.set(&[&stage_name, "receiving"], start_instant.elapsed().as_millis());
}

let process_msg_instant = Instant::now();
let mut received_messages = std::mem::take(&mut self.messages);

// Turns values T into Option<T>, inserting `None` where
Expand All @@ -225,7 +233,11 @@ where
.map(|idx| (*idx, received_messages.remove(idx)))
.collect();

self.processor.process(messages).await
let result = self.processor.process(messages).await;
metrics
.stage_duration
.set(&[&stage_name, "processing"], process_msg_instant.elapsed().as_millis());
result
}

fn awaited_parties(&self) -> std::collections::BTreeSet<AuthorityCount> {
Expand Down
15 changes: 7 additions & 8 deletions engine/multisig/src/client/common/ceremony_stage.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::{collections::BTreeSet, sync::Arc};

use async_trait::async_trait;
use cf_primitives::{AuthorityCount, CeremonyId};
use tokio::sync::mpsc::UnboundedSender;
use utilities::metrics::CeremonyMetrics;

use crate::{
client::{ceremony_manager::CeremonyTrait, utils::PartyIdxMapping},
crypto::Rng,
p2p::OutgoingMultisigStageMessages,
ChainSigning,
};
use async_trait::async_trait;
use cf_primitives::{AuthorityCount, CeremonyId};
use tokio::sync::mpsc::UnboundedSender;
use utilities::metrics::CeremonyMetrics;

/// Outcome of a given ceremony stage
pub enum StageResult<C: CeremonyTrait> {
Expand All @@ -36,7 +35,7 @@ pub enum ProcessMessageResult {
#[async_trait]
pub trait CeremonyStage<C: CeremonyTrait> {
/// Perform initial computation for this stage (and initiate communication with other parties)
fn init(&mut self, metrics: &CeremonyMetrics) -> ProcessMessageResult;
fn init(&mut self, metrics: &mut CeremonyMetrics) -> ProcessMessageResult;

/// Process message from signer at index `signer_idx`. Precondition: the signer is a valid
/// holder of the key and selected to participate in this ceremony (TODO: also check that
Expand All @@ -45,12 +44,12 @@ pub trait CeremonyStage<C: CeremonyTrait> {
&mut self,
signer_idx: AuthorityCount,
m: C::Data,
metrics: &CeremonyMetrics,
metrics: &mut CeremonyMetrics,
) -> ProcessMessageResult;

/// Verify data for this stage after it is received from all other parties,
/// either abort or proceed to the next stage based on the result
async fn finalize(self: Box<Self>) -> StageResult<C>;
async fn finalize(self: Box<Self>, metrics: &mut CeremonyMetrics) -> StageResult<C>;
msgmaxim marked this conversation as resolved.
Show resolved Hide resolved

/// Parties we haven't heard from for the current stage
fn awaited_parties(&self) -> BTreeSet<AuthorityCount>;
Expand Down
Loading