Skip to content

Commit

Permalink
fix(consensus): add a safety margin so the batcher stops building ear…
Browse files Browse the repository at this point in the history
…ly enough

This should become configurable as part of a refactor Guy will do to the consensus configs.
  • Loading branch information
matan-starkware committed Dec 17, 2024
1 parent 3f75d13 commit 11b9bce
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use starknet_batcher_types::batcher_types::{
use starknet_batcher_types::communication::BatcherClient;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, debug_span, error, info, trace, warn, Instrument};
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument};

// TODO(Dan, Matan): Remove this once and replace with real gas prices.
const TEMPORARY_GAS_PRICES: GasPrices = GasPrices {
Expand All @@ -79,6 +79,8 @@ type HeightToIdToContent =
type ValidationParams = (BlockNumber, ValidatorId, Duration, mpsc::Receiver<ProposalPart>);

const CHANNEL_SIZE: usize = 100;
// TODO(Guy): Move this to the context config.
const BUILD_PROPOSAL_MARGIN: Duration = Duration::from_millis(1000);

pub struct SequencerConsensusContext {
batcher: Arc<dyn BatcherClient>,
Expand Down Expand Up @@ -135,26 +137,29 @@ impl SequencerConsensusContext {
impl ConsensusContext for SequencerConsensusContext {
type ProposalPart = ProposalPart;

#[instrument(
level = "info",
skip_all,
fields(height=?proposal_init.height, round=?proposal_init.round)
)]
async fn build_proposal(
&mut self,
proposal_init: ProposalInit,
timeout: Duration,
) -> oneshot::Receiver<ProposalContentId> {
info!("Building proposal: {timeout:?} {proposal_init:?}");
// Handles interrupting an active proposal from a previous height/round
self.set_height_and_round(proposal_init.height, proposal_init.round).await;
debug!(
"Building proposal for height: {} with timeout: {:?}",
proposal_init.height, timeout
);
let (fin_sender, fin_receiver) = oneshot::channel();

let batcher = Arc::clone(&self.batcher);
let valid_proposals = Arc::clone(&self.valid_proposals);

let proposal_id = ProposalId(self.proposal_id);
self.proposal_id += 1;
let timeout =
chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration");
assert!(timeout > BUILD_PROPOSAL_MARGIN);
let timeout = chrono::Duration::from_std(timeout - BUILD_PROPOSAL_MARGIN)
.expect("Can't convert timeout to chrono::Duration");
let now = chrono::Utc::now();
let build_proposal_input = ProposeBlockInput {
proposal_id,
Expand All @@ -179,7 +184,7 @@ impl ConsensusContext for SequencerConsensusContext {
// TODO: Should we be returning an error?
// I think this implies defining an error type in this crate and moving the trait definition
// here also.
debug!("Initiating proposal build: {build_proposal_input:?}");
debug!("Initiating build proposal: {build_proposal_input:?}");
batcher
.propose_block(build_proposal_input)
.await
Expand Down Expand Up @@ -215,27 +220,30 @@ impl ConsensusContext for SequencerConsensusContext {

// Note: this function does not receive ProposalInit.
// That part is consumed by the caller, so it can know the height/round.
#[instrument(level = "info", skip(self, timeout, content_receiver))]
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
validator: ValidatorId,
proposer: ValidatorId,
timeout: Duration,
content_receiver: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
info!("Validating proposal: {timeout:?}");
assert_eq!(Some(height), self.current_height);
let (fin_sender, fin_receiver) = oneshot::channel();
match round.cmp(&self.current_round) {
std::cmp::Ordering::Less => fin_receiver,
std::cmp::Ordering::Greater => {
debug!("Queuing proposal for future round: current_round={}", self.current_round);
self.queued_proposals
.insert(round, ((height, validator, timeout, content_receiver), fin_sender));
.insert(round, ((height, proposer, timeout, content_receiver), fin_sender));
fin_receiver
}
std::cmp::Ordering::Equal => {
self.validate_current_round_proposal(
height,
validator,
proposer,
timeout,
content_receiver,
fin_sender,
Expand Down Expand Up @@ -350,6 +358,7 @@ impl ConsensusContext for SequencerConsensusContext {
}

impl SequencerConsensusContext {
#[instrument(level = "info", skip(self, timeout, content_receiver, fin_sender))]
async fn validate_current_round_proposal(
&mut self,
height: BlockNumber,
Expand All @@ -358,7 +367,7 @@ impl SequencerConsensusContext {
content_receiver: mpsc::Receiver<ProposalPart>,
fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>,
) {
debug!("Validating proposal for height: {height} with timeout: {timeout:?}");
info!("Validating proposal with timeout: {timeout:?}");
let batcher = Arc::clone(&self.batcher);
let valid_proposals = Arc::clone(&self.valid_proposals);
let proposal_id = ProposalId(self.proposal_id);
Expand Down Expand Up @@ -386,6 +395,7 @@ impl SequencerConsensusContext {
sequencer_address: proposer,
},
};
debug!("Initiating validate proposal: {input:?}");
batcher.validate_block(input).await.expect("Failed to initiate proposal validation");

let notify = Arc::new(Notify::new());
Expand All @@ -402,7 +412,9 @@ impl SequencerConsensusContext {
fin_sender,
);
tokio::select! {
_ = notify_clone.notified() => {}
_ = notify_clone.notified() => {
info!("Proposal interrupted: {proposal_id:?}");
}
result = tokio::time::timeout(timeout, validate_fut) =>{
if let Err(e) = result {
error!("Validation timed out. {e:?}");
Expand Down Expand Up @@ -568,12 +580,11 @@ async fn stream_validate_proposal(
let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0);
info!(
"Finished validating proposal {:?}: network_block_id: {:?}, batcher_block_id = {:?}, \
num_txs = {:?}, height = {:?}",
num_txs = {:?}",
proposal_id,
network_block_id,
batcher_block_id,
content.len(),
height
);
// Update valid_proposals before sending fin to avoid a race condition
// with `get_proposal` being called before `valid_proposals` is updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use starknet_types_core::felt::Felt;

use crate::sequencer_consensus_context::SequencerConsensusContext;

const TIMEOUT: Duration = Duration::from_millis(100);
const TIMEOUT: Duration = Duration::from_millis(200);
const CHANNEL_SIZE: usize = 5000;
const NUM_VALIDATORS: u64 = 4;
const STATE_DIFF_COMMITMENT: StateDiffCommitment = StateDiffCommitment(PoseidonHash(Felt::ZERO));
Expand Down
3 changes: 1 addition & 2 deletions crates/starknet_batcher/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ pub(crate) fn deadline_as_instant(
let time_to_deadline = deadline - chrono::Utc::now();
let as_duration =
time_to_deadline.to_std().map_err(|_| BatcherError::TimeToDeadlineError { deadline })?;
// TODO(Matan): this is a temporary solution to the timeout issue.
Ok((std::time::Instant::now() + (as_duration / 2)).into())
Ok((std::time::Instant::now() + as_duration).into())
}

pub(crate) fn verify_block_input(
Expand Down

0 comments on commit 11b9bce

Please sign in to comment.