Skip to content

Commit

Permalink
refactor(batcher): create the tx provider in the batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
yair-starkware committed Nov 10, 2024
1 parent 67474b7 commit a806636
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 56 deletions.
17 changes: 9 additions & 8 deletions crates/batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::proposal_manager::{
ProposalOutput,
StartHeightError,
};
use crate::transaction_provider::DummyL1ProviderClient;
use crate::transaction_provider::{DummyL1ProviderClient, ProposeTransactionProvider};

struct Proposal {
tx_stream: OutputStream,
Expand Down Expand Up @@ -87,13 +87,19 @@ impl Batcher {
)?);

let (tx_sender, tx_receiver) = tokio::sync::mpsc::unbounded_channel();
let tx_provider = ProposeTransactionProvider {
mempool_client: self.mempool_client.clone(),
// TODO: use a real L1 provider client.
l1_provider_client: Arc::new(DummyL1ProviderClient),
};

self.proposal_manager
.build_block_proposal(
build_proposal_input.proposal_id,
build_proposal_input.retrospective_block_hash,
deadline,
tx_sender,
tx_provider,
)
.await
.map_err(BatcherError::from)?;
Expand Down Expand Up @@ -184,7 +190,6 @@ impl Batcher {
}

pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient) -> Batcher {
let l1_provider_client = Arc::new(DummyL1ProviderClient);
let (storage_reader, storage_writer) = papyrus_storage::open_storage(config.storage.clone())
.expect("Failed to open batcher's storage");

Expand All @@ -195,12 +200,8 @@ pub fn create_batcher(config: BatcherConfig, mempool_client: SharedMempoolClient
});
let storage_reader = Arc::new(storage_reader);
let storage_writer = Box::new(storage_writer);
let proposal_manager = Box::new(ProposalManager::new(
l1_provider_client,
mempool_client.clone(),
block_builder_factory,
storage_reader.clone(),
));
let proposal_manager =
Box::new(ProposalManager::new(block_builder_factory, storage_reader.clone()));
Batcher::new(config, storage_reader, storage_writer, mempool_client, proposal_manager)
}

Expand Down
6 changes: 5 additions & 1 deletion crates/batcher/src/batcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::proposal_manager::{
StartHeightError,
};
use crate::test_utils::test_txs;
use crate::transaction_provider::ProposeTransactionProvider;

const INITIAL_HEIGHT: BlockNumber = BlockNumber(3);
const STREAMING_CHUNK_SIZE: usize = 3;
Expand Down Expand Up @@ -85,7 +86,7 @@ async fn get_stream_content(
let mut proposal_manager = MockProposalManagerTraitWrapper::new();
proposal_manager.expect_wrap_start_height().return_once(|_| async { Ok(()) }.boxed());
proposal_manager.expect_wrap_build_block_proposal().return_once(
move |_proposal_id, _block_hash, _deadline, tx_sender| {
move |_proposal_id, _block_hash, _deadline, tx_sender, _tx_provider| {
simulate_build_block_proposal(tx_sender, txs_to_stream).boxed()
},
);
Expand Down Expand Up @@ -248,6 +249,7 @@ trait ProposalManagerTraitWrapper: Send + Sync {
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> BoxFuture<'_, Result<(), BuildProposalError>>;

fn wrap_take_proposal_result(
Expand All @@ -273,12 +275,14 @@ impl<T: ProposalManagerTraitWrapper> ProposalManagerTrait for T {
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> Result<(), BuildProposalError> {
self.wrap_build_block_proposal(
proposal_id,
retrospective_block_hash,
deadline,
output_content_sender,
tx_provider,
)
.await
}
Expand Down
25 changes: 17 additions & 8 deletions crates/batcher/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ pub struct BlockExecutionArtifacts {
#[async_trait]
pub trait BlockBuilderTrait: Send {
async fn build_block(
&self,
&mut self,
deadline: tokio::time::Instant,
tx_provider: Box<dyn TransactionProvider>,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
) -> BlockBuilderResult<BlockExecutionArtifacts>;
}
Expand All @@ -87,27 +86,31 @@ pub struct BlockBuilder {
// TODO(Yael 14/10/2024): make the executor thread safe and delete this mutex.
executor: Mutex<Box<dyn TransactionExecutorTrait>>,
tx_chunk_size: usize,
tx_provider: Box<dyn TransactionProvider>,
}

impl BlockBuilder {
pub fn new(executor: Box<dyn TransactionExecutorTrait>, tx_chunk_size: usize) -> Self {
Self { executor: Mutex::new(executor), tx_chunk_size }
pub fn new(
executor: Box<dyn TransactionExecutorTrait>,
tx_chunk_size: usize,
tx_provider: Box<dyn TransactionProvider>,
) -> Self {
Self { executor: Mutex::new(executor), tx_chunk_size, tx_provider }
}
}

#[async_trait]
impl BlockBuilderTrait for BlockBuilder {
async fn build_block(
&self,
&mut self,
deadline: tokio::time::Instant,
mut tx_provider: Box<dyn TransactionProvider>,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
) -> BlockBuilderResult<BlockExecutionArtifacts> {
let mut block_is_full = false;
let mut execution_infos = IndexMap::new();
// TODO(yael 6/10/2024): delete the timeout condition once the executor has a timeout
while !block_is_full && tokio::time::Instant::now() < deadline {
let next_txs = tx_provider.get_txs(self.tx_chunk_size).await?;
let next_txs = self.tx_provider.get_txs(self.tx_chunk_size).await?;
let next_tx_chunk = match next_txs {
NextTxs::Txs(txs) => txs,
NextTxs::End => break,
Expand Down Expand Up @@ -179,6 +182,7 @@ pub trait BlockBuilderFactoryTrait {
&self,
height: BlockNumber,
retrospective_block_hash: Option<BlockHashAndNumber>,
tx_provider: Box<dyn TransactionProvider>,
) -> BlockBuilderResult<Box<dyn BlockBuilderTrait>>;
}

Expand Down Expand Up @@ -302,9 +306,14 @@ impl BlockBuilderFactoryTrait for BlockBuilderFactory {
&self,
height: BlockNumber,
retrospective_block_hash: Option<BlockHashAndNumber>,
tx_provider: Box<dyn TransactionProvider>,
) -> BlockBuilderResult<Box<dyn BlockBuilderTrait>> {
let executor =
self.preprocess_and_create_transaction_executor(height, retrospective_block_hash)?;
Ok(Box::new(BlockBuilder::new(Box::new(executor), self.block_builder_config.tx_chunk_size)))
Ok(Box::new(BlockBuilder::new(
Box::new(executor),
self.block_builder_config.tx_chunk_size,
tx_provider,
)))
}
}
8 changes: 6 additions & 2 deletions crates/batcher/src/block_builder_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,15 @@ async fn run_build_block(
tx_provider: MockTransactionProvider,
output_sender: UnboundedSender<Transaction>,
) -> BlockExecutionArtifacts {
let block_builder = BlockBuilder::new(Box::new(mock_transaction_executor), TX_CHUNK_SIZE);
let mut block_builder = BlockBuilder::new(
Box::new(mock_transaction_executor),
TX_CHUNK_SIZE,
Box::new(tx_provider),
);
let deadline = tokio::time::Instant::now()
+ tokio::time::Duration::from_secs(BLOCK_GENERATION_DEADLINE_SECS);

block_builder.build_block(deadline, Box::new(tx_provider), output_sender).await.unwrap()
block_builder.build_block(deadline, output_sender).await.unwrap()
}

// TODO: Add test case for failed transaction.
Expand Down
26 changes: 10 additions & 16 deletions crates/batcher/src/proposal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ use starknet_api::executable_transaction::Transaction;
use starknet_api::state::ThinStateDiff;
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::batcher_types::{ProposalCommitment, ProposalId};
use starknet_mempool_types::communication::SharedMempoolClient;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, Instrument};

use crate::batcher::BatcherStorageReaderTrait;
use crate::block_builder::{BlockBuilderError, BlockBuilderFactoryTrait, BlockExecutionArtifacts};
use crate::transaction_provider::{ProposeTransactionProvider, SharedL1ProviderClient};
use crate::transaction_provider::ProposeTransactionProvider;

#[derive(Debug, Error)]
pub enum StartHeightError {
Expand Down Expand Up @@ -73,6 +72,7 @@ pub trait ProposalManagerTrait: Send + Sync {
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> Result<(), BuildProposalError>;

async fn take_proposal_result(
Expand All @@ -94,8 +94,6 @@ pub trait ProposalManagerTrait: Send + Sync {
///
/// Triggered by the consensus.
pub(crate) struct ProposalManager {
l1_provider_client: SharedL1ProviderClient,
mempool_client: SharedMempoolClient,
storage_reader: Arc<dyn BatcherStorageReaderTrait>,
active_height: Option<BlockNumber>,
/// The block proposal that is currently being proposed, if any.
Expand Down Expand Up @@ -151,34 +149,34 @@ impl ProposalManagerTrait for ProposalManager {
/// Starts a new block proposal generation task for the given proposal_id and height with
/// transactions from the mempool.
/// Requires tx_sender for sending the generated transactions to the caller.
#[instrument(skip(self, tx_sender), err, fields(self.active_height))]
#[instrument(skip(self, tx_sender, tx_provider), err, fields(self.active_height))]
async fn build_block_proposal(
&mut self,
proposal_id: ProposalId,
retrospective_block_hash: Option<BlockHashAndNumber>,
deadline: tokio::time::Instant,
tx_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
tx_provider: ProposeTransactionProvider,
) -> Result<(), BuildProposalError> {
let height = self.active_height.ok_or(BuildProposalError::NoActiveHeight)?;
if self.executed_proposals.lock().await.contains_key(&proposal_id) {
return Err(BuildProposalError::ProposalAlreadyExists { proposal_id });
}
info!("Starting generation of a new proposal with id {}.", proposal_id);
self.set_active_proposal(proposal_id).await?;
let block_builder =
self.block_builder_factory.create_block_builder(height, retrospective_block_hash)?;
let mut block_builder = self.block_builder_factory.create_block_builder(
height,
retrospective_block_hash,
Box::new(tx_provider),
)?;

let tx_provider = ProposeTransactionProvider {
mempool_client: self.mempool_client.clone(),
l1_provider_client: self.l1_provider_client.clone(),
};
let active_proposal = self.active_proposal.clone();
let executed_proposals = self.executed_proposals.clone();

self.active_proposal_handle = Some(tokio::spawn(
async move {
let result = block_builder
.build_block(deadline, Box::new(tx_provider), tx_sender.clone())
.build_block(deadline, tx_sender.clone())
.await
.map(ProposalOutput::from)
.map_err(|e| GetProposalResultError::BlockBuilderError(Arc::new(e)));
Expand Down Expand Up @@ -221,14 +219,10 @@ impl ProposalManagerTrait for ProposalManager {

impl ProposalManager {
pub fn new(
l1_provider_client: SharedL1ProviderClient,
mempool_client: SharedMempoolClient,
block_builder_factory: Arc<dyn BlockBuilderFactoryTrait + Send + Sync>,
storage_reader: Arc<dyn BatcherStorageReaderTrait>,
) -> Self {
Self {
l1_provider_client,
mempool_client,
storage_reader,
active_proposal: Arc::new(Mutex::new(None)),
block_builder_factory,
Expand Down
Loading

0 comments on commit a806636

Please sign in to comment.