Skip to content

Commit

Permalink
refactor(starknet_batcher): validate flow refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
dafnamatsry committed Dec 5, 2024
1 parent fa2439b commit 5667b4b
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 240 deletions.
145 changes: 87 additions & 58 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use starknet_batcher_types::batcher_types::{
GetProposalContent,
GetProposalContentInput,
GetProposalContentResponse,
ProposalCommitment,
ProposalId,
ProposalStatus,
ProposeBlockInput,
Expand All @@ -41,11 +42,11 @@ use crate::block_builder::{
use crate::config::BatcherConfig;
use crate::proposal_manager::{
GenerateProposalError,
GetProposalResultError,
InternalProposalStatus,
ProposalError,
ProposalManager,
ProposalManagerTrait,
ProposalOutput,
ProposalResult,
};
use crate::transaction_provider::{
DummyL1ProviderClient,
Expand Down Expand Up @@ -223,75 +224,67 @@ impl Batcher {
&mut self,
send_proposal_content_input: SendProposalContentInput,
) -> BatcherResult<SendProposalContentResponse> {
// TODO(Dafna): this method should return an meaningful error if the given proposal_id
// is of a proposed block (and not validated). Currently it panics or returns a
// wrong error.

let proposal_id = send_proposal_content_input.proposal_id;
if !self.validate_tx_streams.contains_key(&proposal_id) {
return Err(BatcherError::ProposalNotFound { proposal_id });
}

match send_proposal_content_input.content {
SendProposalContent::Txs(txs) => self.send_txs_and_get_status(proposal_id, txs).await,
SendProposalContent::Finish => {
self.close_tx_channel_and_get_commitment(proposal_id).await
}
SendProposalContent::Txs(txs) => self.handle_send_txs_request(proposal_id, txs).await,
SendProposalContent::Finish => self.handle_finish_proposal_request(proposal_id).await,
SendProposalContent::Abort => {
self.proposal_manager.abort_proposal(proposal_id).await;
self.close_input_transaction_stream(proposal_id)?;
Ok(SendProposalContentResponse { response: ProposalStatus::Aborted })
}
}
}

async fn send_txs_and_get_status(
async fn handle_send_txs_request(
&mut self,
proposal_id: ProposalId,
txs: Vec<Transaction>,
) -> BatcherResult<SendProposalContentResponse> {
match self.proposal_manager.get_proposal_status(proposal_id).await {
InternalProposalStatus::Processing => {
let tx_provider_sender = &self
.validate_tx_streams
.get(&proposal_id)
.expect("Expecting tx_provider_sender to exist during batching.");
for tx in txs {
tx_provider_sender.send(tx).await.map_err(|err| {
error!("Failed to send transaction to the tx provider: {}", err);
BatcherError::InternalError
})?;
}
Ok(SendProposalContentResponse { response: ProposalStatus::Processing })
if self.is_active(proposal_id).await {
// The proposal is active. Send the transactions through the tx provider.
let tx_provider_sender = &self
.validate_tx_streams
.get(&proposal_id)
.expect("Expecting tx_provider_sender to exist during batching.");
for tx in txs {
tx_provider_sender.send(tx).await.map_err(|err| {
error!("Failed to send transaction to the tx provider: {}", err);
BatcherError::InternalError
})?;
}
// Proposal Got an Error while processing transactions.
InternalProposalStatus::Failed => {
Ok(SendProposalContentResponse { response: ProposalStatus::InvalidProposal })
}
InternalProposalStatus::Finished => {
Err(BatcherError::ProposalAlreadyFinished { proposal_id })
}
InternalProposalStatus::NotFound => Err(BatcherError::ProposalNotFound { proposal_id }),
return Ok(SendProposalContentResponse { response: ProposalStatus::Processing });
}

// The proposal is no longer active, can't send the transactions.
let proposal_result = self.get_completed_proposal_result(proposal_id).await;
match proposal_result {
Ok(_) => Err(BatcherError::ProposalAlreadyFinished { proposal_id }),
Err(err) => Ok(SendProposalContentResponse { response: proposal_status_from(err)? }),
}
}

async fn close_tx_channel_and_get_commitment(
async fn handle_finish_proposal_request(
&mut self,
proposal_id: ProposalId,
) -> BatcherResult<SendProposalContentResponse> {
debug!("Send proposal content done for {}", proposal_id);

self.close_input_transaction_stream(proposal_id)?;
if self.is_active(proposal_id).await {
self.proposal_manager.await_active_proposal().await;
}

let response = match self.proposal_manager.await_proposal_commitment(proposal_id).await {
Ok(proposal_commitment) => ProposalStatus::Finished(proposal_commitment),
Err(GetProposalResultError::BlockBuilderError(err)) => match err.as_ref() {
BlockBuilderError::FailOnError(_) => ProposalStatus::InvalidProposal,
_ => return Err(BatcherError::InternalError),
},
Err(GetProposalResultError::ProposalDoesNotExist { proposal_id: _ })
| Err(GetProposalResultError::Aborted) => {
panic!("Proposal {} should exist in the proposal manager.", proposal_id)
}
let proposal_result = self.get_completed_proposal_result(proposal_id).await;
let proposal_status = match proposal_result {
Ok(commitment) => ProposalStatus::Finished(commitment),
Err(err) => proposal_status_from(err)?,
};

Ok(SendProposalContentResponse { response })
Ok(SendProposalContentResponse { response: proposal_status })
}

fn close_input_transaction_stream(&mut self, proposal_id: ProposalId) -> BatcherResult<()> {
Expand Down Expand Up @@ -327,17 +320,19 @@ impl Batcher {
// TODO: Consider removing the proposal from the proposal manager and keep it in the batcher
// for decision reached.
self.propose_tx_streams.remove(&proposal_id);
let proposal_commitment =
self.proposal_manager.await_proposal_commitment(proposal_id).await?;
Ok(GetProposalContentResponse {
content: GetProposalContent::Finished(proposal_commitment),
})
let commitment = self.get_completed_proposal_result(proposal_id).await?;

Ok(GetProposalContentResponse { content: GetProposalContent::Finished(commitment) })
}

#[instrument(skip(self), err)]
pub async fn decision_reached(&mut self, input: DecisionReachedInput) -> BatcherResult<()> {
let proposal_id = input.proposal_id;
let proposal_output = self.proposal_manager.take_proposal_result(proposal_id).await?;
let proposal_output = self
.proposal_manager
.take_proposal_result(proposal_id)
.await
.ok_or(BatcherError::ExecutedProposalNotFound { proposal_id })??;
let ProposalOutput { state_diff, nonces: address_to_nonce, tx_hashes, .. } =
proposal_output;
// TODO: Keep the height from start_height or get it from the input.
Expand All @@ -362,6 +357,27 @@ impl Batcher {
}
Ok(())
}

async fn is_active(&self, proposal_id: ProposalId) -> bool {
self.proposal_manager.get_active_proposal().await == Some(proposal_id)
}

// Returns a completed proposal result, either its commitment or an error if the proposal
// failed.
// Panics if the proposal doesn't exist.
async fn get_completed_proposal_result(
&self,
proposal_id: ProposalId,
) -> ProposalResult<ProposalCommitment> {
let completed_proposals = self.proposal_manager.get_completed_proposals().await;
let guard = completed_proposals.lock().await;
let proposal_result = guard.get(&proposal_id).expect("Proposal should exist");

match proposal_result {
Ok(output) => Ok(output.commitment),
Err(e) => Err(e.clone()),
}
}
}

pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher {
Expand Down Expand Up @@ -437,14 +453,11 @@ impl From<GenerateProposalError> for BatcherError {
}
}

impl From<GetProposalResultError> for BatcherError {
fn from(err: GetProposalResultError) -> Self {
impl From<ProposalError> for BatcherError {
fn from(err: ProposalError) -> Self {
match err {
GetProposalResultError::BlockBuilderError(..) => BatcherError::InternalError,
GetProposalResultError::ProposalDoesNotExist { proposal_id } => {
BatcherError::ExecutedProposalNotFound { proposal_id }
}
GetProposalResultError::Aborted => BatcherError::ProposalAborted,
ProposalError::BlockBuilderError(..) => BatcherError::InternalError,
ProposalError::Aborted => BatcherError::ProposalAborted,
}
}
}
Expand Down Expand Up @@ -486,3 +499,19 @@ fn verify_block_number(height: BlockNumber, block_number: BlockNumber) -> Batche
}
Ok(())
}

// Return the appropriate ProposalStatus for a given ProposalError.
fn proposal_status_from(proposal_error: ProposalError) -> BatcherResult<ProposalStatus> {
match proposal_error {
ProposalError::BlockBuilderError(err) => {
if let BlockBuilderError::FailOnError(_) = err.as_ref() {
// The proposal either failed due to bad input (e.g. invalid transactions), or
// couldn't finish in time.
Ok(ProposalStatus::InvalidProposal)
} else {
Err(BatcherError::InternalError)
}
}
ProposalError::Aborted => Err(BatcherError::ProposalAborted),
}
}
Loading

0 comments on commit 5667b4b

Please sign in to comment.