From 0c2254fbc4e1cd93d7ddf76022721794456e0dca Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Sun, 18 Aug 2024 11:48:35 +0300 Subject: [PATCH] feat(consensus): add metric for sync in consensus and update the simulation script --- crates/papyrus_common/src/metrics.rs | 3 +++ .../papyrus_consensus/run_consensus.py | 21 +++++++++++-------- .../papyrus_consensus/src/manager.rs | 3 ++- .../src/simulation_network_receiver.rs | 4 ++++ .../src/single_height_consensus.rs | 4 ++-- 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/crates/papyrus_common/src/metrics.rs b/crates/papyrus_common/src/metrics.rs index b06f42fec8..cdb6e24a66 100644 --- a/crates/papyrus_common/src/metrics.rs +++ b/crates/papyrus_common/src/metrics.rs @@ -40,3 +40,6 @@ pub static COLLECT_PROFILING_METRICS: OnceLock = OnceLock::new(); /// The height consensus is currently working on. pub const PAPYRUS_CONSENSUS_HEIGHT: &str = "papyrus_consensus_height"; + +/// The number of times consensus has progressed due to the sync protocol. +pub const PAPYRUS_CONSENSUS_SYNC_COUNT: &str = "papyrus_consensus_sync_count"; diff --git a/crates/sequencing/papyrus_consensus/run_consensus.py b/crates/sequencing/papyrus_consensus/run_consensus.py index 1b19ab3cde..fbeb3a0ed5 100644 --- a/crates/sequencing/papyrus_consensus/run_consensus.py +++ b/crates/sequencing/papyrus_consensus/run_consensus.py @@ -22,6 +22,7 @@ def __init__(self, validator_id, monitoring_gateway_server_port, cmd): self.cmd = cmd self.process = None self.height_and_timestamp = (None, None) # (height, timestamp) + self.sync_count = None def start(self): self.process = subprocess.Popen(self.cmd, shell=True, preexec_fn=os.setsid) @@ -31,15 +32,17 @@ def stop(self): os.killpg(os.getpgid(self.process.pid), signal.SIGINT) self.process.wait() - def get_height(self): + def get_metric(self, metric: str): port = self.monitoring_gateway_server_port - command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP 'papyrus_consensus_height \\K\\d+'" + command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP '{metric} \\K\\d+'" result = subprocess.run(command, shell=True, capture_output=True, text=True) - # returns the latest decided height, or None if consensus has not yet started. return int(result.stdout) if result.stdout else None - def check_height(self): - height = self.get_height() + # Check the node's metrics and return the height and timestamp. + def check_node(self): + self.sync_count = self.get_metric("papyrus_consensus_sync_count") + + height = self.get_metric("papyrus_consensus_height") if self.height_and_timestamp[0] != height: if self.height_and_timestamp[0] is not None and height is not None: assert height > self.height_and_timestamp[0], "Height should be increasing." @@ -89,8 +92,8 @@ def monitor_simulation(nodes, start_time, duration, stagnation_timeout): return True stagnated_nodes = [] for node in nodes: - (height, last_update) = node.check_height() - print(f"Node: {node.validator_id}, height: {height}") + (height, last_update) = node.check_node() + print(f"Node: {node.validator_id}, height: {height}, sync_count: {node.sync_count}") if height is not None and (curr_time - last_update) > stagnation_timeout: stagnated_nodes.append(node.validator_id) if stagnated_nodes: @@ -107,7 +110,8 @@ def run_simulation(nodes, duration, stagnation_timeout): try: while True: time.sleep(MONITORING_PERIOD) - print(f"\nTime elapsed: {time.time() - start_time}s") + elapsed = round(time.time() - start_time) + print(f"\nTime elapsed: {elapsed}s") should_exit = monitor_simulation(nodes, start_time, duration, stagnation_timeout) if should_exit: break @@ -147,7 +151,6 @@ def build_node(data_dir, logs_dir, i, papryus_args): f"--network.secret_key {SECRET_KEY} " + f"2>&1 | sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {logs_dir}/validator{i}.txt" ) - else: cmd += ( f"--network.bootstrap_peer_multiaddr.#is_none false " diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 3e24434eb7..98c976ab65 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -9,7 +9,7 @@ use std::time::Duration; use futures::channel::{mpsc, oneshot}; use futures::{Stream, StreamExt}; -use papyrus_common::metrics::PAPYRUS_CONSENSUS_HEIGHT; +use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT}; use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; use papyrus_protobuf::converters::ProtobufConversionError; @@ -69,6 +69,7 @@ where current_height = current_height.unchecked_next(); }, sync_height = sync_height(current_height, &mut sync_receiver) => { + metrics::counter!(PAPYRUS_CONSENSUS_SYNC_COUNT, 1); current_height = sync_height?.unchecked_next(); } } diff --git a/crates/sequencing/papyrus_consensus/src/simulation_network_receiver.rs b/crates/sequencing/papyrus_consensus/src/simulation_network_receiver.rs index bf784103da..ecb4135359 100644 --- a/crates/sequencing/papyrus_consensus/src/simulation_network_receiver.rs +++ b/crates/sequencing/papyrus_consensus/src/simulation_network_receiver.rs @@ -13,6 +13,7 @@ use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::ConsensusMessage; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::BlockHash; +use tracing::{debug, instrument}; /// Receiver used to help run simulations of consensus. It has 2 goals in mind: /// 1. Simulate network failures. @@ -66,6 +67,7 @@ where /// /// Applies `drop_probability` followed by `invalid_probability`. So the probability of an /// invalid message is `(1- drop_probability) * invalid_probability`. + #[instrument(skip(self), level = "debug")] pub fn filter_msg(&mut self, mut msg: ConsensusMessage) -> Option { if !matches!(msg, ConsensusMessage::Proposal(_)) { // TODO(matan): Add support for dropping/invalidating votes. @@ -73,6 +75,7 @@ where } if self.should_drop_msg(&msg) { + debug!("Dropping message"); return None; } @@ -109,6 +112,7 @@ where } fn invalidate_msg(&mut self, msg: &mut ConsensusMessage) { + debug!("Invalidating message"); // TODO(matan): Allow for invalid votes based on signature/sender_id. if let ConsensusMessage::Proposal(ref mut proposal) = msg { proposal.block_hash = BlockHash(proposal.block_hash.0 + 1); diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index 8fba89dd93..fd74787f06 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -77,8 +77,8 @@ impl SingleHeightConsensus { fin_receiver: oneshot::Receiver, ) -> Result>, ConsensusError> { debug!( - "Received proposal: proposal_height={}, proposer={:?}", - init.height.0, init.proposer + "Received proposal: height={}, round={}, proposer={:?}", + init.height.0, init.round, init.proposer ); let proposer_id = context.proposer(&self.validators, self.height); if init.height != self.height {