Skip to content

Commit

Permalink
refactor(starknet_batcher): move abort channel creation to the builde…
Browse files Browse the repository at this point in the history
…r factory (#2297)
  • Loading branch information
dafnamatsry authored Dec 2, 2024
1 parent f184253 commit 3fb46f2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
12 changes: 2 additions & 10 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,10 @@ impl Batcher {
self.config.max_l1_handler_txs_per_block_proposal,
);

// A channel to allow aborting the block building task.
let (abort_signal_sender, abort_signal_receiver) = tokio::sync::oneshot::channel();

// A channel to receive the transactions included in the proposed block.
let (output_tx_sender, output_tx_receiver) = tokio::sync::mpsc::unbounded_channel();

let block_builder = self
let (block_builder, abort_signal_sender) = self
.block_builder_factory
.create_block_builder(
BlockMetadata {
Expand All @@ -158,7 +155,6 @@ impl Batcher {
},
Box::new(tx_provider),
Some(output_tx_sender),
abort_signal_receiver,
)
.map_err(|_| BatcherError::InternalError)?;

Expand Down Expand Up @@ -188,10 +184,7 @@ impl Batcher {
l1_provider_client: Arc::new(DummyL1ProviderClient),
};

// A channel to allow aborting the block building task.
let (abort_signal_sender, abort_signal_receiver) = tokio::sync::oneshot::channel();

let block_builder = self
let (block_builder, abort_signal_sender) = self
.block_builder_factory
.create_block_builder(
BlockMetadata {
Expand All @@ -204,7 +197,6 @@ impl Batcher {
},
Box::new(tx_provider),
None,
abort_signal_receiver,
)
.map_err(|_| BatcherError::InternalError)?;

Expand Down
13 changes: 9 additions & 4 deletions crates/starknet_batcher/src/batcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use starknet_mempool_types::mempool_types::CommitBlockArgs;

use crate::batcher::{Batcher, MockBatcherStorageReaderTrait, MockBatcherStorageWriterTrait};
use crate::block_builder::{
AbortSignalSender,
BlockBuilderError,
BlockBuilderTrait,
FailOnErrorCause,
Expand Down Expand Up @@ -129,10 +130,14 @@ fn mock_proposal_manager_common_expectations(
.return_once(move |_| { async move { Ok(proposal_commitment()) } }.boxed());
}

fn abort_signal_sender() -> AbortSignalSender {
tokio::sync::oneshot::channel().0
}

fn mock_create_builder_for_validate_block() -> MockBlockBuilderFactoryTrait {
let mut block_builder_factory = MockBlockBuilderFactoryTrait::new();
block_builder_factory.expect_create_block_builder().times(1).return_once(
|_, _, mut tx_provider, _, _| {
|_, _, mut tx_provider, _| {
// Spawn a task to keep tx_provider alive until all transactions are read.
// Without this, the provider would be dropped, causing the batcher to fail when sending
// transactions to it during the test.
Expand All @@ -141,7 +146,7 @@ fn mock_create_builder_for_validate_block() -> MockBlockBuilderFactoryTrait {
tokio::task::yield_now().await;
}
});
Ok(Box::new(MockBlockBuilderTrait::new()))
Ok((Box::new(MockBlockBuilderTrait::new()), abort_signal_sender()))
},
);
block_builder_factory
Expand All @@ -152,12 +157,12 @@ fn mock_create_builder_for_propose_block(
) -> MockBlockBuilderFactoryTrait {
let mut block_builder_factory = MockBlockBuilderFactoryTrait::new();
block_builder_factory.expect_create_block_builder().times(1).return_once(
|_, _, _, output_content_sender, _| {
|_, _, _, output_content_sender| {
// Simulate the streaming of the block builder output.
for tx in output_txs {
output_content_sender.as_ref().unwrap().send(tx).unwrap();
}
Ok(Box::new(MockBlockBuilderTrait::new()))
Ok((Box::new(MockBlockBuilderTrait::new()), abort_signal_sender()))
},
);
block_builder_factory
Expand Down
15 changes: 9 additions & 6 deletions crates/starknet_batcher/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ pub struct BlockMetadata {
pub retrospective_block_hash: Option<BlockHashAndNumber>,
}

// Type definitions for the abort channel required to abort the block builder.
pub type AbortSignalSender = tokio::sync::oneshot::Sender<()>;

/// The BlockBuilderFactoryTrait is responsible for creating a new block builder.
#[cfg_attr(test, automock)]
pub trait BlockBuilderFactoryTrait: Send + Sync {
Expand All @@ -237,8 +240,7 @@ pub trait BlockBuilderFactoryTrait: Send + Sync {
execution_params: BlockBuilderExecutionParams,
tx_provider: Box<dyn TransactionProvider>,
output_content_sender: Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
abort_signal_receiver: tokio::sync::oneshot::Receiver<()>,
) -> BlockBuilderResult<Box<dyn BlockBuilderTrait>>;
) -> BlockBuilderResult<(Box<dyn BlockBuilderTrait>, AbortSignalSender)>;
}

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -356,16 +358,17 @@ impl BlockBuilderFactoryTrait for BlockBuilderFactory {
execution_params: BlockBuilderExecutionParams,
tx_provider: Box<dyn TransactionProvider>,
output_content_sender: Option<tokio::sync::mpsc::UnboundedSender<Transaction>>,
abort_signal_receiver: tokio::sync::oneshot::Receiver<()>,
) -> BlockBuilderResult<Box<dyn BlockBuilderTrait>> {
) -> BlockBuilderResult<(Box<dyn BlockBuilderTrait>, AbortSignalSender)> {
let executor = self.preprocess_and_create_transaction_executor(&block_metadata)?;
Ok(Box::new(BlockBuilder::new(
let (abort_signal_sender, abort_signal_receiver) = tokio::sync::oneshot::channel();
let block_builder = Box::new(BlockBuilder::new(
Box::new(executor),
tx_provider,
output_content_sender,
abort_signal_receiver,
self.block_builder_config.tx_chunk_size,
execution_params,
)))
));
Ok((block_builder, abort_signal_sender))
}
}

0 comments on commit 3fb46f2

Please sign in to comment.