diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 12ba86891a..80daffec5b 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -56,7 +56,7 @@ use starknet_batcher_types::batcher_types::{ use starknet_batcher_types::communication::BatcherClient; use tokio::sync::Notify; use tokio::task::JoinHandle; -use tracing::{debug, debug_span, error, info, trace, warn, Instrument}; +use tracing::{debug, debug_span, info, trace, warn, Instrument}; // TODO(Dan, Matan): Remove this once and replace with real gas prices. const TEMPORARY_GAS_PRICES: GasPrices = GasPrices { @@ -81,6 +81,12 @@ type ValidationParams = (BlockNumber, ValidatorId, Duration, mpsc::Receiver, validators: Vec, @@ -360,7 +366,7 @@ impl SequencerConsensusContext { height: BlockNumber, proposer: ValidatorId, timeout: Duration, - content_receiver: mpsc::Receiver, + mut content_receiver: mpsc::Receiver, fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>, ) { debug!("Validating proposal for height: {height} with timeout: {timeout:?}"); @@ -396,29 +402,47 @@ impl SequencerConsensusContext { let notify = Arc::new(Notify::new()); let notify_clone = Arc::clone(¬ify); let chain_id = self.chain_id.clone(); + let mut content = Vec::new(); - let handle = tokio::spawn( - async move { - let validate_fut = stream_validate_proposal( - height, - proposal_id, - batcher, - valid_proposals, - content_receiver, - fin_sender, - chain_id, - ); + let handle = tokio::spawn(async move { + let (built_block, received_fin) = loop { tokio::select! { - _ = notify_clone.notified() => {} - result = tokio::time::timeout(timeout, validate_fut) =>{ - if let Err(e) = result { - error!("Validation timed out. {e:?}"); + _ = notify_clone.notified() => { + // TODO(Asmaa): Tell the batcher to abort. + warn!("Proposal interrupted: {:?}", proposal_id); + return; + } + _ = tokio::time::sleep(timeout) => { + // TODO(Asmaa): Tell the batcher to abort. + warn!("Validation timed out"); + return; + } + proposal_part = content_receiver.next() => { + match handle_proposal_part(height, proposal_id, batcher.as_ref(), + proposal_part, &mut content, chain_id.clone()).await { + HandledProposalPart::Finished(built_block, received_fin) => { + break (built_block, received_fin); + } + HandledProposalPart::Continue => {continue;} + HandledProposalPart::Failed(fail_reason) => { + // TODO(Asmaa): Tell the batcher to abort. + warn!("Failed to handle proposal part: {proposal_id:?}, {fail_reason}"); + return; + } } } } + }; + // Update valid_proposals before sending fin to avoid a race condition + // with `get_proposal` being called before `valid_proposals` is updated. + // TODO(Matan): Consider validating the ProposalFin signature here. + let mut valid_proposals = valid_proposals.lock().unwrap(); + valid_proposals.entry(height).or_default().insert(built_block, (content, proposal_id)); + if fin_sender.send((built_block, received_fin)).is_err() { + // Consensus may exit early (e.g. sync). + warn!("Failed to send proposal content ids"); } - .instrument(debug_span!("consensus_validate_proposal")), - ); + }); self.active_proposal = Some((notify, handle)); } @@ -508,95 +532,71 @@ async fn stream_build_proposal( } // Handles receiving a proposal from another node without blocking consensus: -// 1. Receives the proposal content from the network. +// 1. Receives the proposal part from the network. // 2. Pass this to the batcher. // 3. Once finished, receive the commitment from the batcher. -// 4. Store the proposal for re-proposal. -// 5. Send the commitment to consensus. -async fn stream_validate_proposal( +async fn handle_proposal_part( height: BlockNumber, proposal_id: ProposalId, - batcher: Arc, - valid_proposals: Arc>, - mut content_receiver: mpsc::Receiver, - fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>, + batcher: &dyn BatcherClient, + proposal_part: Option, + content: &mut Vec, chain_id: ChainId, -) { - let mut content = Vec::new(); - let network_block_id = loop { - let Some(prop_part) = content_receiver.next().await else { - // TODO(Asmaa): Tell the batcher to abort. - warn!("Failed to receive proposal content: {proposal_id:?}"); - return; - }; - match prop_part { - ProposalPart::Transactions(TransactionBatch { transactions: txs, tx_hashes: _ }) => { - let exe_txs: Vec = txs - .into_iter() - .map(|tx| { - // An error means we have an invalid chain_id. - (tx, &chain_id) - .try_into() - .expect("Failed to convert transaction to executable_transation.") - }) - .collect(); - content.extend_from_slice(&exe_txs[..]); - let input = SendProposalContentInput { - proposal_id, - content: SendProposalContent::Txs(exe_txs), - }; - let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| { - panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}") - }); - match response.response { - ProposalStatus::Processing => {} - ProposalStatus::InvalidProposal => { - warn!("Proposal was invalid: {:?}", proposal_id); - return; - } - status => panic!("Unexpected status: for {proposal_id:?}, {status:?}"), +) -> HandledProposalPart { + match proposal_part { + None => HandledProposalPart::Failed("Failed to receive proposal content".to_string()), + Some(ProposalPart::Transactions(TransactionBatch { transactions: txs, tx_hashes: _ })) => { + let exe_txs: Vec = txs + .into_iter() + .map(|tx| { + // An error means we have an invalid chain_id. + (tx, &chain_id) + .try_into() + .expect("Failed to convert transaction to executable_transation.") + }) + .collect(); + content.extend_from_slice(&exe_txs[..]); + let input = SendProposalContentInput { + proposal_id, + content: SendProposalContent::Txs(exe_txs), + }; + let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| { + panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}") + }); + match response.response { + ProposalStatus::Processing => HandledProposalPart::Continue, + ProposalStatus::InvalidProposal => { + HandledProposalPart::Failed("Invalid proposal".to_string()) } + status => panic!("Unexpected status: for {proposal_id:?}, {status:?}"), } - ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => { - // Output this along with the ID from batcher, to compare them. - break id; - } - _ => panic!("Invalid proposal part: {:?}", prop_part), } - }; - let input = SendProposalContentInput { proposal_id, content: SendProposalContent::Finish }; - let response = batcher - .send_proposal_content(input) - .await - .unwrap_or_else(|e| panic!("Failed to send Fin to batcher: {proposal_id:?}. {e:?}")); - let response_id = match response.response { - ProposalStatus::Finished(id) => id, - ProposalStatus::InvalidProposal => { - warn!("Proposal was invalid: {:?}", proposal_id); - return; + Some(ProposalPart::Fin(ProposalFin { proposal_content_id: id })) => { + // Output this along with the ID from batcher, to compare them. + let input = + SendProposalContentInput { proposal_id, content: SendProposalContent::Finish }; + let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| { + panic!("Failed to send Fin to batcher: {proposal_id:?}. {e:?}") + }); + let response_id = match response.response { + ProposalStatus::Finished(id) => id, + ProposalStatus::InvalidProposal => { + return HandledProposalPart::Failed("Invalid proposal".to_string()); + } + status => panic!("Unexpected status: for {proposal_id:?}, {status:?}"), + }; + let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0); + info!( + "Finished validating proposal {:?}: network_block_id: {:?}, batcher_block_id = \ + {:?}, num_txs = {:?}, height = {:?}", + proposal_id, + id, + batcher_block_id, + content.len(), + height + ); + HandledProposalPart::Finished(batcher_block_id, ProposalFin { proposal_content_id: id }) } - status => panic!("Unexpected status: for {proposal_id:?}, {status:?}"), - }; - let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0); - info!( - "Finished validating proposal {:?}: network_block_id: {:?}, batcher_block_id = {:?}, \ - num_txs = {:?}, height = {:?}", - proposal_id, - network_block_id, - batcher_block_id, - content.len(), - height - ); - // Update valid_proposals before sending fin to avoid a race condition - // with `get_proposal` being called before `valid_proposals` is updated. - // TODO(Matan): Consider validating the ProposalFin signature here. - let mut valid_proposals = valid_proposals.lock().unwrap(); - valid_proposals.entry(height).or_default().insert(batcher_block_id, (content, proposal_id)); - if fin_sender - .send((batcher_block_id, ProposalFin { proposal_content_id: network_block_id })) - .is_err() - { - // Consensus may exit early (e.g. sync). - warn!("Failed to send proposal content ids"); + _ => panic!("Invalid proposal part: {:?}", proposal_part), } }