Skip to content

Commit

Permalink
refactor(consensus): handle proposal part in select branch ensuring c…
Browse files Browse the repository at this point in the history
…ancellation safety
  • Loading branch information
asmaastarkware committed Dec 18, 2024
1 parent 2d78adc commit d313f89
Showing 1 changed file with 100 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use starknet_batcher_types::batcher_types::{
use starknet_batcher_types::communication::BatcherClient;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, debug_span, error, info, trace, warn, Instrument};
use tracing::{debug, debug_span, info, trace, warn, Instrument};

// TODO(Dan, Matan): Remove this once and replace with real gas prices.
const TEMPORARY_GAS_PRICES: GasPrices = GasPrices {
Expand All @@ -81,6 +81,12 @@ type ValidationParams = (BlockNumber, ValidatorId, Duration, mpsc::Receiver<Prop

const CHANNEL_SIZE: usize = 100;

enum HandledProposalPart {
Continue,
Finished(ProposalContentId, ProposalFin),
Failed(String),
}

pub struct SequencerConsensusContext {
batcher: Arc<dyn BatcherClient>,
validators: Vec<ValidatorId>,
Expand Down Expand Up @@ -360,7 +366,7 @@ impl SequencerConsensusContext {
height: BlockNumber,
proposer: ValidatorId,
timeout: Duration,
content_receiver: mpsc::Receiver<ProposalPart>,
mut content_receiver: mpsc::Receiver<ProposalPart>,
fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>,
) {
debug!("Validating proposal for height: {height} with timeout: {timeout:?}");
Expand Down Expand Up @@ -396,29 +402,47 @@ impl SequencerConsensusContext {
let notify = Arc::new(Notify::new());
let notify_clone = Arc::clone(&notify);
let chain_id = self.chain_id.clone();
let mut content = Vec::new();

let handle = tokio::spawn(
async move {
let validate_fut = stream_validate_proposal(
height,
proposal_id,
batcher,
valid_proposals,
content_receiver,
fin_sender,
chain_id,
);
let handle = tokio::spawn(async move {
let (built_block, received_fin) = loop {
tokio::select! {
_ = notify_clone.notified() => {}
result = tokio::time::timeout(timeout, validate_fut) =>{
if let Err(e) = result {
error!("Validation timed out. {e:?}");
_ = notify_clone.notified() => {
// TODO(Asmaa): Tell the batcher to abort.
warn!("Proposal interrupted: {:?}", proposal_id);
return;
}
_ = tokio::time::sleep(timeout) => {
// TODO(Asmaa): Tell the batcher to abort.
warn!("Validation timed out");
return;
}
proposal_part = content_receiver.next() => {
match handle_proposal_part(height, proposal_id, batcher.as_ref(),
proposal_part, &mut content, chain_id.clone()).await {
HandledProposalPart::Finished(built_block, received_fin) => {
break (built_block, received_fin);
}
HandledProposalPart::Continue => {continue;}
HandledProposalPart::Failed(fail_reason) => {
// TODO(Asmaa): Tell the batcher to abort.
warn!("Failed to handle proposal part: {proposal_id:?}, {fail_reason}");
return;
}
}
}
}
};
// Update valid_proposals before sending fin to avoid a race condition
// with `get_proposal` being called before `valid_proposals` is updated.
// TODO(Matan): Consider validating the ProposalFin signature here.
let mut valid_proposals = valid_proposals.lock().unwrap();
valid_proposals.entry(height).or_default().insert(built_block, (content, proposal_id));
if fin_sender.send((built_block, received_fin)).is_err() {
// Consensus may exit early (e.g. sync).
warn!("Failed to send proposal content ids");
}
.instrument(debug_span!("consensus_validate_proposal")),
);
});
self.active_proposal = Some((notify, handle));
}

Expand Down Expand Up @@ -508,95 +532,71 @@ async fn stream_build_proposal(
}

// Handles receiving a proposal from another node without blocking consensus:
// 1. Receives the proposal content from the network.
// 1. Receives the proposal part from the network.
// 2. Pass this to the batcher.
// 3. Once finished, receive the commitment from the batcher.
// 4. Store the proposal for re-proposal.
// 5. Send the commitment to consensus.
async fn stream_validate_proposal(
async fn handle_proposal_part(
height: BlockNumber,
proposal_id: ProposalId,
batcher: Arc<dyn BatcherClient>,
valid_proposals: Arc<Mutex<HeightToIdToContent>>,
mut content_receiver: mpsc::Receiver<ProposalPart>,
fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>,
batcher: &dyn BatcherClient,
proposal_part: Option<ProposalPart>,
content: &mut Vec<ExecutableTransaction>,
chain_id: ChainId,
) {
let mut content = Vec::new();
let network_block_id = loop {
let Some(prop_part) = content_receiver.next().await else {
// TODO(Asmaa): Tell the batcher to abort.
warn!("Failed to receive proposal content: {proposal_id:?}");
return;
};
match prop_part {
ProposalPart::Transactions(TransactionBatch { transactions: txs, tx_hashes: _ }) => {
let exe_txs: Vec<ExecutableTransaction> = txs
.into_iter()
.map(|tx| {
// An error means we have an invalid chain_id.
(tx, &chain_id)
.try_into()
.expect("Failed to convert transaction to executable_transation.")
})
.collect();
content.extend_from_slice(&exe_txs[..]);
let input = SendProposalContentInput {
proposal_id,
content: SendProposalContent::Txs(exe_txs),
};
let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| {
panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}")
});
match response.response {
ProposalStatus::Processing => {}
ProposalStatus::InvalidProposal => {
warn!("Proposal was invalid: {:?}", proposal_id);
return;
}
status => panic!("Unexpected status: for {proposal_id:?}, {status:?}"),
) -> HandledProposalPart {
match proposal_part {
None => HandledProposalPart::Failed("Failed to receive proposal content".to_string()),
Some(ProposalPart::Transactions(TransactionBatch { transactions: txs, tx_hashes: _ })) => {
let exe_txs: Vec<ExecutableTransaction> = txs
.into_iter()
.map(|tx| {
// An error means we have an invalid chain_id.
(tx, &chain_id)
.try_into()
.expect("Failed to convert transaction to executable_transation.")
})
.collect();
content.extend_from_slice(&exe_txs[..]);
let input = SendProposalContentInput {
proposal_id,
content: SendProposalContent::Txs(exe_txs),
};
let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| {
panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}")
});
match response.response {
ProposalStatus::Processing => HandledProposalPart::Continue,
ProposalStatus::InvalidProposal => {
HandledProposalPart::Failed("Invalid proposal".to_string())
}
status => panic!("Unexpected status: for {proposal_id:?}, {status:?}"),
}
ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => {
// Output this along with the ID from batcher, to compare them.
break id;
}
_ => panic!("Invalid proposal part: {:?}", prop_part),
}
};
let input = SendProposalContentInput { proposal_id, content: SendProposalContent::Finish };
let response = batcher
.send_proposal_content(input)
.await
.unwrap_or_else(|e| panic!("Failed to send Fin to batcher: {proposal_id:?}. {e:?}"));
let response_id = match response.response {
ProposalStatus::Finished(id) => id,
ProposalStatus::InvalidProposal => {
warn!("Proposal was invalid: {:?}", proposal_id);
return;
Some(ProposalPart::Fin(ProposalFin { proposal_content_id: id })) => {
// Output this along with the ID from batcher, to compare them.
let input =
SendProposalContentInput { proposal_id, content: SendProposalContent::Finish };
let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| {
panic!("Failed to send Fin to batcher: {proposal_id:?}. {e:?}")
});
let response_id = match response.response {
ProposalStatus::Finished(id) => id,
ProposalStatus::InvalidProposal => {
return HandledProposalPart::Failed("Invalid proposal".to_string());
}
status => panic!("Unexpected status: for {proposal_id:?}, {status:?}"),
};
let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0);
info!(
"Finished validating proposal {:?}: network_block_id: {:?}, batcher_block_id = \
{:?}, num_txs = {:?}, height = {:?}",
proposal_id,
id,
batcher_block_id,
content.len(),
height
);
HandledProposalPart::Finished(batcher_block_id, ProposalFin { proposal_content_id: id })
}
status => panic!("Unexpected status: for {proposal_id:?}, {status:?}"),
};
let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0);
info!(
"Finished validating proposal {:?}: network_block_id: {:?}, batcher_block_id = {:?}, \
num_txs = {:?}, height = {:?}",
proposal_id,
network_block_id,
batcher_block_id,
content.len(),
height
);
// Update valid_proposals before sending fin to avoid a race condition
// with `get_proposal` being called before `valid_proposals` is updated.
// TODO(Matan): Consider validating the ProposalFin signature here.
let mut valid_proposals = valid_proposals.lock().unwrap();
valid_proposals.entry(height).or_default().insert(batcher_block_id, (content, proposal_id));
if fin_sender
.send((batcher_block_id, ProposalFin { proposal_content_id: network_block_id }))
.is_err()
{
// Consensus may exit early (e.g. sync).
warn!("Failed to send proposal content ids");
_ => panic!("Invalid proposal part: {:?}", proposal_part),
}
}

0 comments on commit d313f89

Please sign in to comment.