Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consensus): add metric for sync in consensus and update the simulation script #490

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/papyrus_common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ pub static COLLECT_PROFILING_METRICS: OnceLock<bool> = OnceLock::new();

/// The height consensus is currently working on.
pub const PAPYRUS_CONSENSUS_HEIGHT: &str = "papyrus_consensus_height";

/// The number of times consensus has progressed due to the sync protocol.
pub const PAPYRUS_CONSENSUS_SYNC_COUNT: &str = "papyrus_consensus_sync_count";
21 changes: 12 additions & 9 deletions crates/sequencing/papyrus_consensus/run_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, validator_id, monitoring_gateway_server_port, cmd):
self.cmd = cmd
self.process = None
self.height_and_timestamp = (None, None) # (height, timestamp)
self.sync_count = None

def start(self):
self.process = subprocess.Popen(self.cmd, shell=True, preexec_fn=os.setsid)
Expand All @@ -31,15 +32,17 @@ def stop(self):
os.killpg(os.getpgid(self.process.pid), signal.SIGINT)
self.process.wait()

def get_height(self):
def get_metric(self, metric: str):
port = self.monitoring_gateway_server_port
command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP 'papyrus_consensus_height \\K\\d+'"
command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP '{metric} \\K\\d+'"
result = subprocess.run(command, shell=True, capture_output=True, text=True)
# returns the latest decided height, or None if consensus has not yet started.
return int(result.stdout) if result.stdout else None

def check_height(self):
height = self.get_height()
# Check the node's metrics and return the height and timestamp.
def check_node(self):
self.sync_count = self.get_metric("papyrus_consensus_sync_count")

height = self.get_metric("papyrus_consensus_height")
if self.height_and_timestamp[0] != height:
if self.height_and_timestamp[0] is not None and height is not None:
assert height > self.height_and_timestamp[0], "Height should be increasing."
Expand Down Expand Up @@ -89,8 +92,8 @@ def monitor_simulation(nodes, start_time, duration, stagnation_timeout):
return True
stagnated_nodes = []
for node in nodes:
(height, last_update) = node.check_height()
print(f"Node: {node.validator_id}, height: {height}")
(height, last_update) = node.check_node()
print(f"Node: {node.validator_id}, height: {height}, sync_count: {node.sync_count}")
if height is not None and (curr_time - last_update) > stagnation_timeout:
stagnated_nodes.append(node.validator_id)
if stagnated_nodes:
Expand All @@ -107,7 +110,8 @@ def run_simulation(nodes, duration, stagnation_timeout):
try:
while True:
time.sleep(MONITORING_PERIOD)
print(f"\nTime elapsed: {time.time() - start_time}s")
elapsed = round(time.time() - start_time)
print(f"\nTime elapsed: {elapsed}s")
should_exit = monitor_simulation(nodes, start_time, duration, stagnation_timeout)
if should_exit:
break
Expand Down Expand Up @@ -147,7 +151,6 @@ def build_node(data_dir, logs_dir, i, papryus_args):
f"--network.secret_key {SECRET_KEY} "
+ f"2>&1 | sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {logs_dir}/validator{i}.txt"
)

else:
cmd += (
f"--network.bootstrap_peer_multiaddr.#is_none false "
Expand Down
3 changes: 2 additions & 1 deletion crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::{Stream, StreamExt};
use papyrus_common::metrics::PAPYRUS_CONSENSUS_HEIGHT;
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use papyrus_protobuf::converters::ProtobufConversionError;
Expand Down Expand Up @@ -69,6 +69,7 @@ where
current_height = current_height.unchecked_next();
},
sync_height = sync_height(current_height, &mut sync_receiver) => {
metrics::increment_counter!(PAPYRUS_CONSENSUS_SYNC_COUNT);
current_height = sync_height?.unchecked_next();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::ConsensusMessage;
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::BlockHash;
use tracing::{debug, instrument};

/// Receiver used to help run simulations of consensus. It has 2 goals in mind:
/// 1. Simulate network failures.
Expand Down Expand Up @@ -66,13 +67,15 @@ where
///
/// Applies `drop_probability` followed by `invalid_probability`. So the probability of an
/// invalid message is `(1- drop_probability) * invalid_probability`.
#[instrument(skip(self), level = "debug")]
pub fn filter_msg(&mut self, mut msg: ConsensusMessage) -> Option<ConsensusMessage> {
if !matches!(msg, ConsensusMessage::Proposal(_)) {
// TODO(matan): Add support for dropping/invalidating votes.
return Some(msg);
}

if self.should_drop_msg(&msg) {
debug!("Dropping message");
return None;
}

Expand Down Expand Up @@ -109,6 +112,7 @@ where
}

fn invalidate_msg(&mut self, msg: &mut ConsensusMessage) {
debug!("Invalidating message");
// TODO(matan): Allow for invalid votes based on signature/sender_id.
if let ConsensusMessage::Proposal(ref mut proposal) = msg {
proposal.block_hash = BlockHash(proposal.block_hash.0 + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<Option<Decision<BlockT>>, ConsensusError> {
debug!(
"Received proposal: proposal_height={}, proposer={:?}",
init.height.0, init.proposer
"Received proposal: height={}, round={}, proposer={:?}",
init.height.0, init.round, init.proposer
);
let proposer_id = context.proposer(&self.validators, self.height);
if init.height != self.height {
Expand Down
Loading