Skip to content

Commit

Permalink
refactor(starknet_batcher): validate flow refactor (#2441)
Browse files Browse the repository at this point in the history
  • Loading branch information
dafnamatsry authored Dec 15, 2024
1 parent 21466a6 commit fe2c664
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 193 deletions.
143 changes: 89 additions & 54 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use starknet_batcher_types::batcher_types::{
GetProposalContent,
GetProposalContentInput,
GetProposalContentResponse,
ProposalCommitment,
ProposalId,
ProposalStatus,
ProposeBlockInput,
Expand All @@ -42,11 +43,11 @@ use crate::block_builder::{
use crate::config::BatcherConfig;
use crate::proposal_manager::{
GenerateProposalError,
InternalProposalStatus,
ProposalError,
ProposalManager,
ProposalManagerTrait,
ProposalOutput,
ProposalResult,
};
use crate::transaction_provider::{
DummyL1ProviderClient,
Expand Down Expand Up @@ -224,75 +225,74 @@ impl Batcher {
&mut self,
send_proposal_content_input: SendProposalContentInput,
) -> BatcherResult<SendProposalContentResponse> {
// TODO(Dafna): this method should return an meaningful error if the given proposal_id
// is of a proposed block (and not validated). Currently it panics or returns a
// wrong error.

let proposal_id = send_proposal_content_input.proposal_id;
if !self.validate_tx_streams.contains_key(&proposal_id) {
return Err(BatcherError::ProposalNotFound { proposal_id });
}

match send_proposal_content_input.content {
SendProposalContent::Txs(txs) => self.send_txs_and_get_status(proposal_id, txs).await,
SendProposalContent::Finish => {
self.close_tx_channel_and_get_commitment(proposal_id).await
}
SendProposalContent::Abort => {
self.proposal_manager.abort_proposal(proposal_id).await;
Ok(SendProposalContentResponse { response: ProposalStatus::Aborted })
}
SendProposalContent::Txs(txs) => self.handle_send_txs_request(proposal_id, txs).await,
SendProposalContent::Finish => self.handle_finish_proposal_request(proposal_id).await,
SendProposalContent::Abort => self.handle_abort_proposal_request(proposal_id).await,
}
}

async fn send_txs_and_get_status(
async fn handle_send_txs_request(
&mut self,
proposal_id: ProposalId,
txs: Vec<Transaction>,
) -> BatcherResult<SendProposalContentResponse> {
match self.proposal_manager.get_proposal_status(proposal_id).await {
InternalProposalStatus::Processing => {
let tx_provider_sender = &self
.validate_tx_streams
.get(&proposal_id)
.expect("Expecting tx_provider_sender to exist during batching.");
for tx in txs {
tx_provider_sender.send(tx).await.map_err(|err| {
error!("Failed to send transaction to the tx provider: {}", err);
BatcherError::InternalError
})?;
}
Ok(SendProposalContentResponse { response: ProposalStatus::Processing })
if self.is_active(proposal_id).await {
// The proposal is active. Send the transactions through the tx provider.
let tx_provider_sender = &self
.validate_tx_streams
.get(&proposal_id)
.expect("Expecting tx_provider_sender to exist during batching.");
for tx in txs {
tx_provider_sender.send(tx).await.map_err(|err| {
error!("Failed to send transaction to the tx provider: {}", err);
BatcherError::InternalError
})?;
}
// Proposal Got an Error while processing transactions.
InternalProposalStatus::Failed => {
Ok(SendProposalContentResponse { response: ProposalStatus::InvalidProposal })
}
InternalProposalStatus::Finished => {
Err(BatcherError::ProposalAlreadyFinished { proposal_id })
}
InternalProposalStatus::NotFound => Err(BatcherError::ProposalNotFound { proposal_id }),
return Ok(SendProposalContentResponse { response: ProposalStatus::Processing });
}

// The proposal is no longer active, can't send the transactions.
let proposal_result =
self.get_completed_proposal_result(proposal_id).await.expect("Proposal should exist.");
match proposal_result {
Ok(_) => Err(BatcherError::ProposalAlreadyFinished { proposal_id }),
Err(err) => Ok(SendProposalContentResponse { response: proposal_status_from(err)? }),
}
}

async fn close_tx_channel_and_get_commitment(
async fn handle_finish_proposal_request(
&mut self,
proposal_id: ProposalId,
) -> BatcherResult<SendProposalContentResponse> {
debug!("Send proposal content done for {}", proposal_id);

self.close_input_transaction_stream(proposal_id)?;
if self.is_active(proposal_id).await {
self.proposal_manager.await_active_proposal().await;
}

let response = match self.proposal_manager.await_proposal_commitment(proposal_id).await {
Some(Ok(proposal_commitment)) => ProposalStatus::Finished(proposal_commitment),
Some(Err(ProposalError::BlockBuilderError(err))) => match err.as_ref() {
BlockBuilderError::FailOnError(_) => ProposalStatus::InvalidProposal,
_ => return Err(BatcherError::InternalError),
},
Some(Err(ProposalError::Aborted)) => return Err(BatcherError::ProposalAborted),
None => {
panic!("Proposal {} should exist in the proposal manager.", proposal_id)
}
let proposal_result =
self.get_completed_proposal_result(proposal_id).await.expect("Proposal should exist.");
let proposal_status = match proposal_result {
Ok(commitment) => ProposalStatus::Finished(commitment),
Err(err) => proposal_status_from(err)?,
};
Ok(SendProposalContentResponse { response: proposal_status })
}

Ok(SendProposalContentResponse { response })
async fn handle_abort_proposal_request(
&mut self,
proposal_id: ProposalId,
) -> BatcherResult<SendProposalContentResponse> {
self.proposal_manager.abort_proposal(proposal_id).await;
self.close_input_transaction_stream(proposal_id)?;
Ok(SendProposalContentResponse { response: ProposalStatus::Aborted })
}

fn close_input_transaction_stream(&mut self, proposal_id: ProposalId) -> BatcherResult<()> {
Expand Down Expand Up @@ -337,14 +337,12 @@ impl Batcher {
// TODO: Consider removing the proposal from the proposal manager and keep it in the batcher
// for decision reached.
self.propose_tx_streams.remove(&proposal_id);
let proposal_commitment = self
.proposal_manager
.await_proposal_commitment(proposal_id)
let commitment = self
.get_completed_proposal_result(proposal_id)
.await
.ok_or(BatcherError::ProposalNotFound { proposal_id })??;
Ok(GetProposalContentResponse {
content: GetProposalContent::Finished(proposal_commitment),
})
.expect("Proposal should exist.")?;

Ok(GetProposalContentResponse { content: GetProposalContent::Finished(commitment) })
}

#[instrument(skip(self), err)]
Expand Down Expand Up @@ -379,6 +377,27 @@ impl Batcher {
}
Ok(())
}

async fn is_active(&self, proposal_id: ProposalId) -> bool {
self.proposal_manager.get_active_proposal().await == Some(proposal_id)
}

// Returns a completed proposal result, either its commitment or an error if the proposal
// failed. If the proposal doesn't exist, or it's still active, returns None.
async fn get_completed_proposal_result(
&self,
proposal_id: ProposalId,
) -> Option<ProposalResult<ProposalCommitment>> {
let completed_proposals = self.proposal_manager.get_completed_proposals().await;
let guard = completed_proposals.lock().await;
let proposal_result = guard.get(&proposal_id);

match proposal_result {
Some(Ok(output)) => Some(Ok(output.commitment)),
Some(Err(e)) => Some(Err(e.clone())),
None => None,
}
}
}

pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher {
Expand Down Expand Up @@ -501,3 +520,19 @@ fn verify_block_number(height: BlockNumber, block_number: BlockNumber) -> Batche
}
Ok(())
}

// Return the appropriate ProposalStatus for a given ProposalError.
fn proposal_status_from(proposal_error: ProposalError) -> BatcherResult<ProposalStatus> {
match proposal_error {
ProposalError::BlockBuilderError(err) => {
if let BlockBuilderError::FailOnError(_) = err.as_ref() {
// The proposal either failed due to bad input (e.g. invalid transactions), or
// couldn't finish in time.
Ok(ProposalStatus::InvalidProposal)
} else {
Err(BatcherError::InternalError)
}
}
ProposalError::Aborted => Err(BatcherError::ProposalAborted),
}
}
Loading

0 comments on commit fe2c664

Please sign in to comment.