Skip to content

Commit

Permalink
refactor(starknet_batcher): move around some types and utils to a com…
Browse files Browse the repository at this point in the history
…mon utils file
  • Loading branch information
dafnamatsry committed Dec 16, 2024
1 parent 3821842 commit cbee779
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 163 deletions.
89 changes: 13 additions & 76 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::collections::HashMap;
use std::sync::Arc;

use blockifier::abi::constants;
use blockifier::state::global_cache::GlobalContractCache;
use chrono::Utc;
#[cfg(test)]
use mockall::automock;
use papyrus_storage::state::{StateStorageReader, StateStorageWriter};
use starknet_api::block::{BlockHashAndNumber, BlockNumber};
use starknet_api::block::BlockNumber;
use starknet_api::executable_transaction::Transaction;
use starknet_api::state::ThinStateDiff;
use starknet_batcher_types::batcher_types::{
Expand All @@ -34,26 +32,25 @@ use starknet_sequencer_infra::component_definitions::ComponentStarter;
use tracing::{debug, error, info, instrument, trace};

use crate::block_builder::{
BlockBuilderError,
BlockBuilderExecutionParams,
BlockBuilderFactory,
BlockBuilderFactoryTrait,
BlockMetadata,
};
use crate::config::BatcherConfig;
use crate::proposal_manager::{
GenerateProposalError,
ProposalError,
ProposalManager,
ProposalManagerTrait,
ProposalOutput,
ProposalResult,
};
use crate::proposal_manager::{GenerateProposalError, ProposalManager, ProposalManagerTrait};
use crate::transaction_provider::{
DummyL1ProviderClient,
ProposeTransactionProvider,
ValidateTransactionProvider,
};
use crate::utils::{
deadline_as_instant,
proposal_status_from,
verify_block_input,
ProposalOutput,
ProposalResult,
};

type OutputStreamReceiver = tokio::sync::mpsc::UnboundedReceiver<Transaction>;
type InputStreamSender = tokio::sync::mpsc::Sender<Transaction>;
Expand Down Expand Up @@ -340,7 +337,8 @@ impl Batcher {
let commitment = self
.get_completed_proposal_result(proposal_id)
.await
.expect("Proposal should exist.")?;
.expect("Proposal should exist.")
.map_err(|_| BatcherError::InternalError)?;

Ok(GetProposalContentResponse { content: GetProposalContent::Finished(commitment) })
}
Expand All @@ -352,7 +350,8 @@ impl Batcher {
.proposal_manager
.take_proposal_result(proposal_id)
.await
.ok_or(BatcherError::ExecutedProposalNotFound { proposal_id })??;
.ok_or(BatcherError::ExecutedProposalNotFound { proposal_id })?
.map_err(|_| BatcherError::InternalError)?;
let ProposalOutput { state_diff, nonces: address_to_nonce, tx_hashes, .. } =
proposal_output;
// TODO: Keep the height from start_height or get it from the input.
Expand Down Expand Up @@ -473,66 +472,4 @@ impl From<GenerateProposalError> for BatcherError {
}
}

impl From<ProposalError> for BatcherError {
fn from(err: ProposalError) -> Self {
match err {
ProposalError::BlockBuilderError(..) => BatcherError::InternalError,
ProposalError::Aborted => BatcherError::ProposalAborted,
}
}
}

impl ComponentStarter for Batcher {}

pub fn deadline_as_instant(deadline: chrono::DateTime<Utc>) -> BatcherResult<tokio::time::Instant> {
let time_to_deadline = deadline - chrono::Utc::now();
let as_duration =
time_to_deadline.to_std().map_err(|_| BatcherError::TimeToDeadlineError { deadline })?;
// TODO(Matan): this is a temporary solution to the timeout issue.
Ok((std::time::Instant::now() + (as_duration / 2)).into())
}

fn verify_block_input(
height: BlockNumber,
block_number: BlockNumber,
retrospective_block_hash: Option<BlockHashAndNumber>,
) -> BatcherResult<()> {
verify_non_empty_retrospective_block_hash(height, retrospective_block_hash)?;
verify_block_number(height, block_number)?;
Ok(())
}

fn verify_non_empty_retrospective_block_hash(
height: BlockNumber,
retrospective_block_hash: Option<BlockHashAndNumber>,
) -> BatcherResult<()> {
if height >= BlockNumber(constants::STORED_BLOCK_HASH_BUFFER)
&& retrospective_block_hash.is_none()
{
return Err(BatcherError::MissingRetrospectiveBlockHash);
}
Ok(())
}

fn verify_block_number(height: BlockNumber, block_number: BlockNumber) -> BatcherResult<()> {
if block_number != height {
return Err(BatcherError::InvalidBlockNumber { active_height: height, block_number });
}
Ok(())
}

// Return the appropriate ProposalStatus for a given ProposalError.
fn proposal_status_from(proposal_error: ProposalError) -> BatcherResult<ProposalStatus> {
match proposal_error {
ProposalError::BlockBuilderError(err) => {
if let BlockBuilderError::FailOnError(_) = err.as_ref() {
// The proposal either failed due to bad input (e.g. invalid transactions), or
// couldn't finish in time.
Ok(ProposalStatus::InvalidProposal)
} else {
Err(BatcherError::InternalError)
}
}
ProposalError::Aborted => Err(BatcherError::ProposalAborted),
}
}
13 changes: 3 additions & 10 deletions crates/starknet_batcher/src/batcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ use crate::block_builder::{
MockBlockBuilderTrait,
};
use crate::config::BatcherConfig;
use crate::proposal_manager::{
GenerateProposalError,
ProposalError,
ProposalManagerTrait,
ProposalOutput,
ProposalResult,
};
use crate::proposal_manager::{GenerateProposalError, ProposalManagerTrait};
use crate::test_utils::test_txs;
use crate::transaction_provider::NextTxs;
use crate::utils::{ProposalOutput, ProposalResult};

const INITIAL_HEIGHT: BlockNumber = BlockNumber(3);
const STREAMING_CHUNK_SIZE: usize = 3;
Expand Down Expand Up @@ -101,9 +96,7 @@ fn validate_block_input() -> ValidateBlockInput {
}

fn invalid_proposal_result() -> ProposalResult<ProposalOutput> {
Err(ProposalError::BlockBuilderError(Arc::new(BlockBuilderError::FailOnError(
FailOnErrorCause::BlockFull,
))))
Err(Arc::new(BlockBuilderError::FailOnError(FailOnErrorCause::BlockFull)))
}

struct MockDependencies {
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod transaction_executor;
mod transaction_provider;
#[cfg(test)]
mod transaction_provider_test;
mod utils;

// Re-export so it can be used in the general config of the sequencer node without depending on
// blockifier.
Expand Down
78 changes: 10 additions & 68 deletions crates/starknet_batcher/src/proposal_manager.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use indexmap::IndexMap;
use starknet_api::block_hash::state_diff_hash::calculate_state_diff_hash;
use starknet_api::core::{ContractAddress, Nonce};
use starknet_api::state::ThinStateDiff;
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::batcher_types::{ProposalCommitment, ProposalId};
use starknet_batcher_types::batcher_types::ProposalId;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, Instrument};

use crate::block_builder::{BlockBuilderError, BlockBuilderTrait, BlockExecutionArtifacts};
use crate::block_builder::{BlockBuilderError, BlockBuilderTrait};
use crate::utils::{ProposalOutput, ProposalResult, ProposalTask};

#[derive(Debug, Error)]
pub enum GenerateProposalError {
Expand All @@ -32,14 +28,6 @@ pub enum GenerateProposalError {
ProposalAlreadyExists { proposal_id: ProposalId },
}

#[derive(Clone, Debug, Error)]
pub enum ProposalError {
#[error(transparent)]
BlockBuilderError(Arc<BlockBuilderError>),
#[error("Proposal was aborted")]
Aborted,
}

#[async_trait]
pub trait ProposalManagerTrait: Send + Sync {
async fn spawn_proposal(
Expand Down Expand Up @@ -68,12 +56,6 @@ pub trait ProposalManagerTrait: Send + Sync {
async fn reset(&mut self);
}

// Represents a spawned task of building new block proposal.
struct ProposalTask {
abort_signal_sender: tokio::sync::oneshot::Sender<()>,
join_handle: tokio::task::JoinHandle<()>,
}

/// Main struct for handling block proposals.
/// Taking care of:
/// - Proposing new blocks.
Expand All @@ -91,16 +73,6 @@ pub(crate) struct ProposalManager {
executed_proposals: Arc<Mutex<HashMap<ProposalId, ProposalResult<ProposalOutput>>>>,
}

pub type ProposalResult<T> = Result<T, ProposalError>;

#[derive(Debug, Default, PartialEq)]
pub struct ProposalOutput {
pub state_diff: ThinStateDiff,
pub commitment: ProposalCommitment,
pub tx_hashes: HashSet<TransactionHash>,
pub nonces: HashMap<ContractAddress, Nonce>,
}

#[async_trait]
impl ProposalManagerTrait for ProposalManager {
/// Starts a new block proposal generation task for the given proposal_id.
Expand All @@ -121,11 +93,8 @@ impl ProposalManagerTrait for ProposalManager {

let join_handle = tokio::spawn(
async move {
let result = block_builder
.build_block()
.await
.map(ProposalOutput::from)
.map_err(|e| ProposalError::BlockBuilderError(Arc::new(e)));
let result =
block_builder.build_block().await.map(ProposalOutput::from).map_err(Arc::new);

// The proposal is done, clear the active proposal.
// Keep the proposal result only if it is the same as the active proposal.
Expand Down Expand Up @@ -175,7 +144,10 @@ impl ProposalManagerTrait for ProposalManager {
async fn abort_proposal(&mut self, proposal_id: ProposalId) {
if *self.active_proposal.lock().await == Some(proposal_id) {
self.abort_active_proposal().await;
self.executed_proposals.lock().await.insert(proposal_id, Err(ProposalError::Aborted));
self.executed_proposals
.lock()
.await
.insert(proposal_id, Err(Arc::new(BlockBuilderError::Aborted)));
}
}

Expand Down Expand Up @@ -227,33 +199,3 @@ impl ProposalManager {
}
}
}

impl From<BlockExecutionArtifacts> for ProposalOutput {
fn from(artifacts: BlockExecutionArtifacts) -> Self {
let commitment_state_diff = artifacts.commitment_state_diff;
let nonces = HashMap::from_iter(
commitment_state_diff
.address_to_nonce
.iter()
.map(|(address, nonce)| (*address, *nonce)),
);

// TODO: Get these from the transactions.
let deployed_contracts = IndexMap::new();
let declared_classes = IndexMap::new();
let state_diff = ThinStateDiff {
deployed_contracts,
storage_diffs: commitment_state_diff.storage_updates,
declared_classes,
nonces: commitment_state_diff.address_to_nonce,
// TODO: Remove this when the structure of storage diffs changes.
deprecated_declared_classes: Vec::new(),
replaced_classes: IndexMap::new(),
};
let commitment =
ProposalCommitment { state_diff_commitment: calculate_state_diff_hash(&state_diff) };
let tx_hashes = HashSet::from_iter(artifacts.execution_infos.keys().copied());

Self { state_diff, commitment, tx_hashes, nonces }
}
}
18 changes: 9 additions & 9 deletions crates/starknet_batcher/src/proposal_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use rstest::{fixture, rstest};
use starknet_api::executable_transaction::Transaction;
use starknet_batcher_types::batcher_types::ProposalId;

use crate::block_builder::{BlockBuilderTrait, BlockExecutionArtifacts, MockBlockBuilderTrait};
use crate::proposal_manager::{
GenerateProposalError,
ProposalError,
ProposalManager,
ProposalManagerTrait,
ProposalOutput,
use crate::block_builder::{
BlockBuilderError,
BlockBuilderTrait,
BlockExecutionArtifacts,
MockBlockBuilderTrait,
};
use crate::proposal_manager::{GenerateProposalError, ProposalManager, ProposalManagerTrait};
use crate::utils::ProposalOutput;

const BLOCK_GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1);

Expand Down Expand Up @@ -133,8 +133,8 @@ async fn abort_active_proposal(mut proposal_manager: ProposalManager) {
proposal_manager.abort_proposal(ProposalId(0)).await;

assert_matches!(
proposal_manager.take_proposal_result(ProposalId(0)).await,
Some(Err(ProposalError::Aborted))
*proposal_manager.take_proposal_result(ProposalId(0)).await.unwrap().unwrap_err(),
BlockBuilderError::Aborted
);

// Make sure there is no active proposal.
Expand Down
Loading

0 comments on commit cbee779

Please sign in to comment.