Skip to content

Commit

Permalink
fix(consensus): add a safety margin so the batcher stops building ear…
Browse files Browse the repository at this point in the history
…ly enough (#2691)

This should become configurable as part of a refactor Guy will do to the consensus configs.
  • Loading branch information
matan-starkware authored Dec 19, 2024
1 parent 177759e commit d8114ea
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use starknet_batcher_types::batcher_types::{
use starknet_batcher_types::communication::BatcherClient;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, info, trace, warn, Instrument};
use tracing::{debug, debug_span, info, instrument, trace, warn, Instrument};

// TODO(Dan, Matan): Remove this once and replace with real gas prices.
const TEMPORARY_GAS_PRICES: GasPrices = GasPrices {
Expand Down Expand Up @@ -87,6 +87,12 @@ enum HandledProposalPart {
Failed(String),
}

// Safety margin to make sure that the batcher completes building the proposal with enough time for
// the Fin to be checked by validators.
//
// TODO(Guy): Move this to the context config.
const BUILD_PROPOSAL_MARGIN: Duration = Duration::from_millis(1000);

pub struct SequencerConsensusContext {
batcher: Arc<dyn BatcherClient>,
validators: Vec<ValidatorId>,
Expand Down Expand Up @@ -146,31 +152,30 @@ impl SequencerConsensusContext {
impl ConsensusContext for SequencerConsensusContext {
type ProposalPart = ProposalPart;

#[instrument(level = "info", skip_all, fields(proposal_init))]
async fn build_proposal(
&mut self,
proposal_init: ProposalInit,
timeout: Duration,
) -> oneshot::Receiver<ProposalContentId> {
info!("Building proposal: timeout={timeout:?}");
// Handles interrupting an active proposal from a previous height/round
self.set_height_and_round(proposal_init.height, proposal_init.round).await;
debug!(
"Building proposal for height: {} with timeout: {:?}",
proposal_init.height, timeout
);
let (fin_sender, fin_receiver) = oneshot::channel();

let batcher = Arc::clone(&self.batcher);
let valid_proposals = Arc::clone(&self.valid_proposals);

let proposal_id = ProposalId(self.proposal_id);
self.proposal_id += 1;
let timeout =
chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration");
assert!(timeout > BUILD_PROPOSAL_MARGIN);
let batcher_timeout = chrono::Duration::from_std(timeout - BUILD_PROPOSAL_MARGIN)
.expect("Can't convert timeout to chrono::Duration");
let now = chrono::Utc::now();
let build_proposal_input = ProposeBlockInput {
proposal_id,
// TODO: Discuss with batcher team passing std Duration instead.
deadline: now + timeout,
deadline: now + batcher_timeout,
// TODO: This is not part of Milestone 1.
retrospective_block_hash: Some(BlockHashAndNumber {
number: BlockNumber::default(),
Expand All @@ -190,7 +195,7 @@ impl ConsensusContext for SequencerConsensusContext {
// TODO: Should we be returning an error?
// I think this implies defining an error type in this crate and moving the trait definition
// here also.
debug!("Initiating proposal build: {build_proposal_input:?}");
debug!("Initiating build proposal: {build_proposal_input:?}");
batcher
.propose_block(build_proposal_input)
.await
Expand Down Expand Up @@ -226,17 +231,20 @@ impl ConsensusContext for SequencerConsensusContext {

// Note: this function does not receive ProposalInit.
// That part is consumed by the caller, so it can know the height/round.
#[instrument(level = "info", skip(self, timeout, content_receiver))]
async fn validate_proposal(
&mut self,
proposal_init: ProposalInit,
timeout: Duration,
content_receiver: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
info!("Validating proposal: timeout={timeout:?}");
assert_eq!(Some(proposal_init.height), self.current_height);
let (fin_sender, fin_receiver) = oneshot::channel();
match proposal_init.round.cmp(&self.current_round) {
std::cmp::Ordering::Less => fin_receiver,
std::cmp::Ordering::Greater => {
debug!("Queuing proposal for future round: current_round={}", self.current_round);
self.queued_proposals.insert(
proposal_init.round,
(
Expand Down Expand Up @@ -364,6 +372,7 @@ impl ConsensusContext for SequencerConsensusContext {
}

impl SequencerConsensusContext {
#[instrument(level = "info", skip(self, timeout, content_receiver, fin_sender))]
async fn validate_current_round_proposal(
&mut self,
height: BlockNumber,
Expand All @@ -372,7 +381,7 @@ impl SequencerConsensusContext {
mut content_receiver: mpsc::Receiver<ProposalPart>,
fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>,
) {
debug!("Validating proposal for height: {height} with timeout: {timeout:?}");
info!("Validating proposal with timeout: {timeout:?}");
let batcher = Arc::clone(&self.batcher);
let valid_proposals = Arc::clone(&self.valid_proposals);
let proposal_id = ProposalId(self.proposal_id);
Expand Down Expand Up @@ -400,6 +409,7 @@ impl SequencerConsensusContext {
sequencer_address: proposer,
},
};
debug!("Initiating validate proposal: input={input:?}");
batcher.validate_block(input).await.expect("Failed to initiate proposal validation");

let token = CancellationToken::new();
Expand All @@ -421,8 +431,13 @@ impl SequencerConsensusContext {
return;
}
proposal_part = content_receiver.next() => {
match handle_proposal_part(height, proposal_id, batcher.as_ref(),
proposal_part, &mut content, chain_id.clone()).await {
match handle_proposal_part(
proposal_id,
batcher.as_ref(),
proposal_part,
&mut content,
chain_id.clone()
).await {
HandledProposalPart::Finished(built_block, received_fin) => {
break (built_block, received_fin);
}
Expand Down Expand Up @@ -539,7 +554,6 @@ async fn stream_build_proposal(
// 2. Pass this to the batcher.
// 3. Once finished, receive the commitment from the batcher.
async fn handle_proposal_part(
height: BlockNumber,
proposal_id: ProposalId,
batcher: &dyn BatcherClient,
proposal_part: Option<ProposalPart>,
Expand Down Expand Up @@ -591,12 +605,11 @@ async fn handle_proposal_part(
let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0);
info!(
"Finished validating proposal {:?}: network_block_id: {:?}, batcher_block_id = \
{:?}, num_txs = {:?}, height = {:?}",
{:?}, num_txs = {:?}",
proposal_id,
id,
batcher_block_id,
content.len(),
height
);
HandledProposalPart::Finished(batcher_block_id, ProposalFin { proposal_content_id: id })
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use starknet_types_core::felt::Felt;

use crate::sequencer_consensus_context::SequencerConsensusContext;

const TIMEOUT: Duration = Duration::from_millis(100);
const TIMEOUT: Duration = Duration::from_millis(1200);
const CHANNEL_SIZE: usize = 5000;
const NUM_VALIDATORS: u64 = 4;
const STATE_DIFF_COMMITMENT: StateDiffCommitment = StateDiffCommitment(PoseidonHash(Felt::ZERO));
Expand Down
3 changes: 1 addition & 2 deletions crates/starknet_batcher/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ pub(crate) fn deadline_as_instant(
let time_to_deadline = deadline - chrono::Utc::now();
let as_duration =
time_to_deadline.to_std().map_err(|_| BatcherError::TimeToDeadlineError { deadline })?;
// TODO(Matan): this is a temporary solution to the timeout issue.
Ok((std::time::Instant::now() + (as_duration / 2)).into())
Ok((std::time::Instant::now() + as_duration).into())
}

pub(crate) fn verify_block_input(
Expand Down

0 comments on commit d8114ea

Please sign in to comment.