Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(starknet_batcher): validate flow refactor #2441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 89 additions & 54 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use starknet_batcher_types::batcher_types::{
GetProposalContent,
GetProposalContentInput,
GetProposalContentResponse,
ProposalCommitment,
ProposalId,
ProposalStatus,
ProposeBlockInput,
Expand All @@ -42,11 +43,11 @@ use crate::block_builder::{
use crate::config::BatcherConfig;
use crate::proposal_manager::{
GenerateProposalError,
InternalProposalStatus,
ProposalError,
ProposalManager,
ProposalManagerTrait,
ProposalOutput,
ProposalResult,
};
use crate::transaction_provider::{
DummyL1ProviderClient,
Expand Down Expand Up @@ -224,75 +225,74 @@ impl Batcher {
&mut self,
send_proposal_content_input: SendProposalContentInput,
) -> BatcherResult<SendProposalContentResponse> {
// TODO(Dafna): this method should return an meaningful error if the given proposal_id
// is of a proposed block (and not validated). Currently it panics or returns a
// wrong error.

let proposal_id = send_proposal_content_input.proposal_id;
if !self.validate_tx_streams.contains_key(&proposal_id) {
return Err(BatcherError::ProposalNotFound { proposal_id });
}

match send_proposal_content_input.content {
SendProposalContent::Txs(txs) => self.send_txs_and_get_status(proposal_id, txs).await,
SendProposalContent::Finish => {
self.close_tx_channel_and_get_commitment(proposal_id).await
}
SendProposalContent::Abort => {
self.proposal_manager.abort_proposal(proposal_id).await;
Ok(SendProposalContentResponse { response: ProposalStatus::Aborted })
}
SendProposalContent::Txs(txs) => self.handle_send_txs_request(proposal_id, txs).await,
SendProposalContent::Finish => self.handle_finish_proposal_request(proposal_id).await,
SendProposalContent::Abort => self.handle_abort_proposal_request(proposal_id).await,
}
}

async fn send_txs_and_get_status(
async fn handle_send_txs_request(
&mut self,
proposal_id: ProposalId,
txs: Vec<Transaction>,
) -> BatcherResult<SendProposalContentResponse> {
match self.proposal_manager.get_proposal_status(proposal_id).await {
InternalProposalStatus::Processing => {
let tx_provider_sender = &self
.validate_tx_streams
.get(&proposal_id)
.expect("Expecting tx_provider_sender to exist during batching.");
for tx in txs {
tx_provider_sender.send(tx).await.map_err(|err| {
error!("Failed to send transaction to the tx provider: {}", err);
BatcherError::InternalError
})?;
}
Ok(SendProposalContentResponse { response: ProposalStatus::Processing })
if self.is_active(proposal_id).await {
// The proposal is active. Send the transactions through the tx provider.
let tx_provider_sender = &self
.validate_tx_streams
.get(&proposal_id)
.expect("Expecting tx_provider_sender to exist during batching.");
for tx in txs {
tx_provider_sender.send(tx).await.map_err(|err| {
error!("Failed to send transaction to the tx provider: {}", err);
BatcherError::InternalError
})?;
}
// Proposal Got an Error while processing transactions.
InternalProposalStatus::Failed => {
Ok(SendProposalContentResponse { response: ProposalStatus::InvalidProposal })
}
InternalProposalStatus::Finished => {
Err(BatcherError::ProposalAlreadyFinished { proposal_id })
}
InternalProposalStatus::NotFound => Err(BatcherError::ProposalNotFound { proposal_id }),
return Ok(SendProposalContentResponse { response: ProposalStatus::Processing });
}

// The proposal is no longer active, can't send the transactions.
let proposal_result =
self.get_completed_proposal_result(proposal_id).await.expect("Proposal should exist.");
match proposal_result {
Ok(_) => Err(BatcherError::ProposalAlreadyFinished { proposal_id }),
Err(err) => Ok(SendProposalContentResponse { response: proposal_status_from(err)? }),
}
}

async fn close_tx_channel_and_get_commitment(
async fn handle_finish_proposal_request(
&mut self,
proposal_id: ProposalId,
) -> BatcherResult<SendProposalContentResponse> {
debug!("Send proposal content done for {}", proposal_id);

self.close_input_transaction_stream(proposal_id)?;
if self.is_active(proposal_id).await {
self.proposal_manager.await_active_proposal().await;
}

let response = match self.proposal_manager.await_proposal_commitment(proposal_id).await {
Some(Ok(proposal_commitment)) => ProposalStatus::Finished(proposal_commitment),
Some(Err(ProposalError::BlockBuilderError(err))) => match err.as_ref() {
BlockBuilderError::FailOnError(_) => ProposalStatus::InvalidProposal,
_ => return Err(BatcherError::InternalError),
},
Some(Err(ProposalError::Aborted)) => return Err(BatcherError::ProposalAborted),
None => {
panic!("Proposal {} should exist in the proposal manager.", proposal_id)
}
let proposal_result =
self.get_completed_proposal_result(proposal_id).await.expect("Proposal should exist.");
let proposal_status = match proposal_result {
Ok(commitment) => ProposalStatus::Finished(commitment),
Err(err) => proposal_status_from(err)?,
};
Ok(SendProposalContentResponse { response: proposal_status })
}

Ok(SendProposalContentResponse { response })
async fn handle_abort_proposal_request(
&mut self,
proposal_id: ProposalId,
) -> BatcherResult<SendProposalContentResponse> {
self.proposal_manager.abort_proposal(proposal_id).await;
self.close_input_transaction_stream(proposal_id)?;
Ok(SendProposalContentResponse { response: ProposalStatus::Aborted })
}

fn close_input_transaction_stream(&mut self, proposal_id: ProposalId) -> BatcherResult<()> {
Expand Down Expand Up @@ -337,14 +337,12 @@ impl Batcher {
// TODO: Consider removing the proposal from the proposal manager and keep it in the batcher
// for decision reached.
self.propose_tx_streams.remove(&proposal_id);
let proposal_commitment = self
.proposal_manager
.await_proposal_commitment(proposal_id)
let commitment = self
.get_completed_proposal_result(proposal_id)
.await
.ok_or(BatcherError::ProposalNotFound { proposal_id })??;
Ok(GetProposalContentResponse {
content: GetProposalContent::Finished(proposal_commitment),
})
.expect("Proposal should exist.")?;

Ok(GetProposalContentResponse { content: GetProposalContent::Finished(commitment) })
}

#[instrument(skip(self), err)]
Expand Down Expand Up @@ -379,6 +377,27 @@ impl Batcher {
}
Ok(())
}

async fn is_active(&self, proposal_id: ProposalId) -> bool {
self.proposal_manager.get_active_proposal().await == Some(proposal_id)
}

// Returns a completed proposal result, either its commitment or an error if the proposal
// failed. If the proposal doesn't exist, or it's still active, returns None.
async fn get_completed_proposal_result(
&self,
proposal_id: ProposalId,
) -> Option<ProposalResult<ProposalCommitment>> {
let completed_proposals = self.proposal_manager.get_completed_proposals().await;
let guard = completed_proposals.lock().await;
let proposal_result = guard.get(&proposal_id);

match proposal_result {
Some(Ok(output)) => Some(Ok(output.commitment)),
Some(Err(e)) => Some(Err(e.clone())),
None => None,
}
}
}

pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher {
Expand Down Expand Up @@ -501,3 +520,19 @@ fn verify_block_number(height: BlockNumber, block_number: BlockNumber) -> Batche
}
Ok(())
}

// Return the appropriate ProposalStatus for a given ProposalError.
fn proposal_status_from(proposal_error: ProposalError) -> BatcherResult<ProposalStatus> {
match proposal_error {
ProposalError::BlockBuilderError(err) => {
if let BlockBuilderError::FailOnError(_) = err.as_ref() {
// The proposal either failed due to bad input (e.g. invalid transactions), or
// couldn't finish in time.
Ok(ProposalStatus::InvalidProposal)
} else {
Err(BatcherError::InternalError)
}
}
ProposalError::Aborted => Err(BatcherError::ProposalAborted),
}
}
Loading
Loading