From 89746a6a265eb916d50a02938f4ad790ad7cf3e1 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Mon, 16 Dec 2024 13:29:51 +0200 Subject: [PATCH] fix(consensus): add a safety margin so the batcher stops building early enough This should become configurable as part of a refactor Guy will do to the consensus configs. --- .../src/sequencer_consensus_context.rs | 41 ++++++++++++------- .../src/sequencer_consensus_context_test.rs | 2 +- crates/starknet_batcher/src/utils.rs | 3 +- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 12ba86891a..2eb6de1b6b 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -56,7 +56,7 @@ use starknet_batcher_types::batcher_types::{ use starknet_batcher_types::communication::BatcherClient; use tokio::sync::Notify; use tokio::task::JoinHandle; -use tracing::{debug, debug_span, error, info, trace, warn, Instrument}; +use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument}; // TODO(Dan, Matan): Remove this once and replace with real gas prices. const TEMPORARY_GAS_PRICES: GasPrices = GasPrices { @@ -80,6 +80,8 @@ type HeightToIdToContent = type ValidationParams = (BlockNumber, ValidatorId, Duration, mpsc::Receiver); const CHANNEL_SIZE: usize = 100; +// TODO(Guy): Move this to the context config. +const BUILD_PROPOSAL_MARGIN: Duration = Duration::from_millis(1000); pub struct SequencerConsensusContext { batcher: Arc, @@ -140,17 +142,19 @@ impl SequencerConsensusContext { impl ConsensusContext for SequencerConsensusContext { type ProposalPart = ProposalPart; + #[instrument( + level = "info", + skip_all, + fields(height=?proposal_init.height, round=?proposal_init.round) + )] async fn build_proposal( &mut self, proposal_init: ProposalInit, timeout: Duration, ) -> oneshot::Receiver { + info!("Building proposal: {timeout:?} {proposal_init:?}"); // Handles interrupting an active proposal from a previous height/round self.set_height_and_round(proposal_init.height, proposal_init.round).await; - debug!( - "Building proposal for height: {} with timeout: {:?}", - proposal_init.height, timeout - ); let (fin_sender, fin_receiver) = oneshot::channel(); let batcher = Arc::clone(&self.batcher); @@ -158,8 +162,9 @@ impl ConsensusContext for SequencerConsensusContext { let proposal_id = ProposalId(self.proposal_id); self.proposal_id += 1; - let timeout = - chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration"); + assert!(timeout > BUILD_PROPOSAL_MARGIN); + let timeout = chrono::Duration::from_std(timeout - BUILD_PROPOSAL_MARGIN) + .expect("Can't convert timeout to chrono::Duration"); let now = chrono::Utc::now(); let build_proposal_input = ProposeBlockInput { proposal_id, @@ -184,7 +189,7 @@ impl ConsensusContext for SequencerConsensusContext { // TODO: Should we be returning an error? // I think this implies defining an error type in this crate and moving the trait definition // here also. - debug!("Initiating proposal build: {build_proposal_input:?}"); + debug!("Initiating build proposal: {build_proposal_input:?}"); batcher .propose_block(build_proposal_input) .await @@ -220,27 +225,30 @@ impl ConsensusContext for SequencerConsensusContext { // Note: this function does not receive ProposalInit. // That part is consumed by the caller, so it can know the height/round. + #[instrument(level = "info", skip(self, timeout, content_receiver))] async fn validate_proposal( &mut self, height: BlockNumber, round: Round, - validator: ValidatorId, + proposer: ValidatorId, timeout: Duration, content_receiver: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { + info!("Validating proposal: {timeout:?}"); assert_eq!(Some(height), self.current_height); let (fin_sender, fin_receiver) = oneshot::channel(); match round.cmp(&self.current_round) { std::cmp::Ordering::Less => fin_receiver, std::cmp::Ordering::Greater => { + debug!("Queuing proposal for future round: current_round={}", self.current_round); self.queued_proposals - .insert(round, ((height, validator, timeout, content_receiver), fin_sender)); + .insert(round, ((height, proposer, timeout, content_receiver), fin_sender)); fin_receiver } std::cmp::Ordering::Equal => { self.validate_current_round_proposal( height, - validator, + proposer, timeout, content_receiver, fin_sender, @@ -355,6 +363,7 @@ impl ConsensusContext for SequencerConsensusContext { } impl SequencerConsensusContext { + #[instrument(level = "info", skip(self, timeout, content_receiver, fin_sender))] async fn validate_current_round_proposal( &mut self, height: BlockNumber, @@ -363,7 +372,7 @@ impl SequencerConsensusContext { content_receiver: mpsc::Receiver, fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>, ) { - debug!("Validating proposal for height: {height} with timeout: {timeout:?}"); + info!("Validating proposal with timeout: {timeout:?}"); let batcher = Arc::clone(&self.batcher); let valid_proposals = Arc::clone(&self.valid_proposals); let proposal_id = ProposalId(self.proposal_id); @@ -391,6 +400,7 @@ impl SequencerConsensusContext { sequencer_address: proposer, }, }; + debug!("Initiating validate proposal: {input:?}"); batcher.validate_block(input).await.expect("Failed to initiate proposal validation"); let notify = Arc::new(Notify::new()); @@ -409,7 +419,9 @@ impl SequencerConsensusContext { chain_id, ); tokio::select! { - _ = notify_clone.notified() => {} + _ = notify_clone.notified() => { + info!("Proposal interrupted: {proposal_id:?}"); + } result = tokio::time::timeout(timeout, validate_fut) =>{ if let Err(e) = result { error!("Validation timed out. {e:?}"); @@ -580,12 +592,11 @@ async fn stream_validate_proposal( let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0); info!( "Finished validating proposal {:?}: network_block_id: {:?}, batcher_block_id = {:?}, \ - num_txs = {:?}, height = {:?}", + num_txs = {:?}", proposal_id, network_block_id, batcher_block_id, content.len(), - height ); // Update valid_proposals before sending fin to avoid a race condition // with `get_proposal` being called before `valid_proposals` is updated. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index f72bde01da..056dddebe0 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -45,7 +45,7 @@ use starknet_types_core::felt::Felt; use crate::sequencer_consensus_context::SequencerConsensusContext; -const TIMEOUT: Duration = Duration::from_millis(100); +const TIMEOUT: Duration = Duration::from_millis(200); const CHANNEL_SIZE: usize = 5000; const NUM_VALIDATORS: u64 = 4; const STATE_DIFF_COMMITMENT: StateDiffCommitment = StateDiffCommitment(PoseidonHash(Felt::ZERO)); diff --git a/crates/starknet_batcher/src/utils.rs b/crates/starknet_batcher/src/utils.rs index 1d656d8eda..4ea923a4db 100644 --- a/crates/starknet_batcher/src/utils.rs +++ b/crates/starknet_batcher/src/utils.rs @@ -67,8 +67,7 @@ pub(crate) fn deadline_as_instant( let time_to_deadline = deadline - chrono::Utc::now(); let as_duration = time_to_deadline.to_std().map_err(|_| BatcherError::TimeToDeadlineError { deadline })?; - // TODO(Matan): this is a temporary solution to the timeout issue. - Ok((std::time::Instant::now() + (as_duration / 2)).into()) + Ok((std::time::Instant::now() + as_duration).into()) } pub(crate) fn verify_block_input(