Skip to content

Commit

Permalink
feat(batcher): modify the transaction_provider trait to support the v…
Browse files Browse the repository at this point in the history
…alidate flow
  • Loading branch information
Yael-Starkware committed Nov 5, 2024
1 parent cf4553d commit 1c99a26
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 9 deletions.
10 changes: 7 additions & 3 deletions crates/batcher/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tracing::{debug, error, info, trace};

use crate::papyrus_state::PapyrusReader;
use crate::transaction_executor::TransactionExecutorTrait;
use crate::transaction_provider::{TransactionProvider, TransactionProviderError};
use crate::transaction_provider::{NextTxs, TransactionProvider, TransactionProviderError};

#[derive(Debug, Error)]
pub enum BlockBuilderError {
Expand Down Expand Up @@ -142,14 +142,18 @@ impl BlockBuilderTrait for BlockBuilder {
async fn build_block(
&self,
deadline: tokio::time::Instant,
tx_provider: Box<dyn TransactionProvider>,
mut tx_provider: Box<dyn TransactionProvider>,
output_content_sender: tokio::sync::mpsc::UnboundedSender<Transaction>,
) -> BlockBuilderResult<BlockExecutionArtifacts> {
let mut block_is_full = false;
let mut execution_infos = IndexMap::new();
// TODO(yael 6/10/2024): delete the timeout condition once the executor has a timeout
while !block_is_full && tokio::time::Instant::now() < deadline {
let next_tx_chunk = tx_provider.get_txs(self.tx_chunk_size).await?;
let next_txs = tx_provider.get_txs(self.tx_chunk_size).await?;
let next_tx_chunk = match next_txs {
NextTxs::Txs(txs) => txs,
NextTxs::End => break,
};
debug!("Got {} transactions from the transaction provider.", next_tx_chunk.len());
if next_tx_chunk.is_empty() {
// TODO: Consider what is the best sleep duration.
Expand Down
47 changes: 44 additions & 3 deletions crates/batcher/src/block_builder_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use crate::block_builder::{BlockBuilder, BlockBuilderTrait, BlockExecutionArtifacts};
use crate::test_utils::test_txs;
use crate::transaction_executor::MockTransactionExecutorTrait;
use crate::transaction_provider::MockTransactionProvider;
use crate::transaction_provider::{MockTransactionProvider, NextTxs};

const BLOCK_GENERATION_DEADLINE_SECS: u64 = 1;
const TX_CHANNEL_SIZE: usize = 50;
Expand Down Expand Up @@ -144,6 +144,26 @@ fn test_expectations_with_delay(
(mock_transaction_executor, mock_tx_provider, expected_block_artifacts)
}

fn stream_done_test_expectations(
input_txs: &[Transaction],
) -> (MockTransactionExecutorTrait, MockTransactionProvider, BlockExecutionArtifacts) {
let block_size = input_txs.len();
let input_txs_cloned = input_txs.to_vec();
let mut mock_transaction_executor = MockTransactionExecutorTrait::new();

mock_transaction_executor
.expect_add_txs_to_block()
.withf(move |blockifier_input| compare_tx_hashes(&input_txs_cloned, blockifier_input))
.return_once(move |_| (0..block_size).map(|_| Ok(execution_info())).collect());

let expected_block_artifacts =
set_close_block_expectations(&mut mock_transaction_executor, block_size);

let mock_tx_provider = mock_tx_provider_stream_done(input_txs.to_vec());

(mock_transaction_executor, mock_tx_provider, expected_block_artifacts)
}

// Fill the executor outputs with some non-default values to make sure the block_builder uses
// them.
fn block_builder_expected_output(execution_info_len: usize) -> BlockExecutionArtifacts {
Expand Down Expand Up @@ -180,7 +200,24 @@ fn mock_tx_provider_limited_calls(
.expect_get_txs()
.times(n_calls)
.with(eq(TX_CHUNK_SIZE))
.returning(move |_n_txs| Ok(input_chunks.remove(0)));
.returning(move |_n_txs| Ok(NextTxs::Txs(input_chunks.remove(0))));
mock_tx_provider
}

fn mock_tx_provider_stream_done(input_chunk: Vec<Transaction>) -> MockTransactionProvider {
let mut mock_tx_provider = MockTransactionProvider::new();
let mut seq = Sequence::new();
mock_tx_provider
.expect_get_txs()
.times(1)
.in_sequence(&mut seq)
.with(eq(TX_CHUNK_SIZE))
.returning(move |_n_txs| Ok(NextTxs::Txs(input_chunk.clone())));
mock_tx_provider
.expect_get_txs()
.times(1)
.in_sequence(&mut seq)
.returning(|_n_txs| Ok(NextTxs::End));
mock_tx_provider
}

Expand All @@ -198,7 +235,10 @@ fn mock_tx_provider_limitless_calls(
}

fn add_limitless_empty_calls(mock_tx_provider: &mut MockTransactionProvider) {
mock_tx_provider.expect_get_txs().with(eq(TX_CHUNK_SIZE)).returning(|_n_txs| Ok(Vec::new()));
mock_tx_provider
.expect_get_txs()
.with(eq(TX_CHUNK_SIZE))
.returning(|_n_txs| Ok(NextTxs::Txs(Vec::new())));
}

fn compare_tx_hashes(input: &[Transaction], blockifier_input: &[BlockifierTransaction]) -> bool {
Expand Down Expand Up @@ -247,6 +287,7 @@ async fn run_build_block(
#[case::empty_block(0, vec![], empty_block_test_expectations())]
#[case::block_full(1, test_txs(0..3), block_full_test_expectations(&input_txs, expected_block_size))]
#[case::deadline_reached_after_first_chunk(3, test_txs(0..6), test_expectations_with_delay(&input_txs))]
#[case::stream_done(2, test_txs(0..2), stream_done_test_expectations(&input_txs))]
#[tokio::test]
async fn test_build_block(
#[case] expected_block_size: usize,
Expand Down
30 changes: 27 additions & 3 deletions crates/batcher/src/transaction_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ pub enum TransactionProviderError {
MempoolError(#[from] MempoolClientError),
}

#[derive(Debug, PartialEq)]
pub enum NextTxs {
Txs(Vec<Transaction>),
End,
}

#[cfg_attr(test, automock)]
#[async_trait]
pub trait TransactionProvider: Send + Sync {
async fn get_txs(&self, n_txs: usize) -> Result<Vec<Transaction>, TransactionProviderError>;
async fn get_txs(&mut self, n_txs: usize) -> Result<NextTxs, TransactionProviderError>;
}

pub struct ProposeTransactionProvider {
Expand All @@ -23,8 +29,26 @@ pub struct ProposeTransactionProvider {

#[async_trait]
impl TransactionProvider for ProposeTransactionProvider {
async fn get_txs(&self, n_txs: usize) -> Result<Vec<Transaction>, TransactionProviderError> {
async fn get_txs(&mut self, n_txs: usize) -> Result<NextTxs, TransactionProviderError> {
// TODO: Get also L1 transactions.
Ok(self.mempool_client.get_txs(n_txs).await?)
Ok(NextTxs::Txs(self.mempool_client.get_txs(n_txs).await?))
}
}

pub struct ValidateTransactionProvider {
pub tx_receiver: tokio::sync::mpsc::Receiver<Transaction>,
}

#[async_trait]
impl TransactionProvider for ValidateTransactionProvider {
async fn get_txs(&mut self, n_txs: usize) -> Result<NextTxs, TransactionProviderError> {
let mut buffer = Vec::with_capacity(n_txs);
self.tx_receiver.recv_many(&mut buffer, n_txs).await;
// If the buffer is empty, it means that the stream was dropped, otherwise it would have
// been waiting for transactions.
if buffer.is_empty() {
return Ok(NextTxs::End);
}
Ok(NextTxs::Txs(buffer))
}
}

0 comments on commit 1c99a26

Please sign in to comment.