Skip to content

Commit

Permalink
refactor(papyrus_p2p_sync): run_test supports all data types
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Dec 5, 2024
1 parent 17d5d3b commit 63cba88
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 48 deletions.
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/class_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use starknet_api::state::SierraContractClass;
use super::test_utils::{
setup,
wait_for_marker,
MarkerKind,
DataType,
TestArgs,
CLASS_DIFF_QUERY_LENGTH,
HEADER_QUERY_LENGTH,
Expand Down Expand Up @@ -112,7 +112,7 @@ async fn class_basic_flow() {
.unwrap();

wait_for_marker(
MarkerKind::Class,
DataType::Class,
&storage_reader,
block_number.unchecked_next(),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
Expand Down
12 changes: 7 additions & 5 deletions crates/papyrus_p2p_sync/src/client/header_test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use futures::{FutureExt, StreamExt};
use papyrus_protobuf::sync::{
BlockHashOrNumber,
Expand All @@ -19,7 +21,7 @@ use super::test_utils::{
setup,
wait_for_marker,
Action,
MarkerKind,
DataType,
TestArgs,
HEADER_QUERY_LENGTH,
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
Expand Down Expand Up @@ -92,7 +94,7 @@ async fn signed_headers_basic_flow() {
// sent.
let block_number = BlockNumber(i.try_into().unwrap());
wait_for_marker(
MarkerKind::Header,
DataType::Header,
&storage_reader,
block_number.unchecked_next(),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
Expand Down Expand Up @@ -199,17 +201,17 @@ async fn sync_sends_new_header_query_if_it_got_partial_responses() {
#[tokio::test]
async fn wrong_block_number() {
run_test(
1,
HashMap::from([(DataType::Header, 1)]),
vec![
// We already validate the query content in other tests.
Action::ReceiveQuery(Box::new(|_query| ())),
Action::ReceiveQuery(Box::new(|_query| ()), DataType::Header),
Action::SendHeader(DataOrFin(Some(random_header(
&mut get_rng(),
BlockNumber(1),
None,
None,
)))),
Action::ValidateReportSent,
Action::ValidateReportSent(DataType::Header),
Action::CheckStorage(Box::new(|reader| {
async move {
assert_eq!(0, reader.begin_ro_txn().unwrap().get_header_marker().unwrap().0);
Expand Down
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/state_diff_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use super::test_utils::{
create_block_hashes_and_signatures,
setup,
wait_for_marker,
DataType,
HeaderTestPayload,
MarkerKind,
StateDiffTestPayload,
TestArgs,
HEADER_QUERY_LENGTH,
Expand Down Expand Up @@ -95,7 +95,7 @@ async fn state_diff_basic_flow() {
// responses.

wait_for_marker(
MarkerKind::State,
DataType::StateDiff,
&storage_reader,
block_number.unchecked_next(),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
Expand Down
167 changes: 131 additions & 36 deletions crates/papyrus_p2p_sync/src/client/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::time::{Duration, Instant};

use futures::future::BoxFuture;
Expand Down Expand Up @@ -121,40 +123,66 @@ pub fn setup() -> TestArgs {
}
}

#[derive(Eq, PartialEq, Hash)]
pub enum DataType {
Header,
#[allow(dead_code)]
Transaction,
StateDiff,
#[allow(dead_code)]
Class,
}

pub enum Action {
/// Get a header query from the sync and run custom validations on it.
ReceiveQuery(Box<dyn FnOnce(Query)>),
ReceiveQuery(Box<dyn FnOnce(Query)>, DataType),
/// Send a header as a response to a query we got from ReceiveQuery. Will panic if didn't call
/// ReceiveQuery.
/// ReceiveQuery with DataType::Header before.
SendHeader(DataOrFin<SignedBlockHeader>),
/// Send a state diff as a response to a query we got from ReceiveQuery. Will panic if didn't
/// call ReceiveQuery with DataType::StateDiff before.
#[allow(dead_code)]
SendStateDiff(DataOrFin<StateDiffChunk>),
/// Send a transaction as a response to a query we got from ReceiveQuery. Will panic if didn't
/// call ReceiveQuery with DataType::Transaction before.
#[allow(dead_code)]
SendTransaction(DataOrFin<FullTransaction>),
/// Send a class as a response to a query we got from ReceiveQuery. Will panic if didn't
/// call ReceiveQuery with DataType::Class before.
#[allow(dead_code)]
SendClass(DataOrFin<(ApiContractClass, ClassHash)>),
/// Perform custom validations on the storage. Returns back the storage reader it received as
/// input
CheckStorage(Box<dyn FnOnce(StorageReader) -> BoxFuture<'static, ()>>),
/// Check that a report was sent on the current header query.
ValidateReportSent,
ValidateReportSent(DataType),
}

// TODO(shahak): add support for state diffs, transactions and classes.
pub async fn run_test(header_max_query_length: u64, actions: Vec<Action>) {
pub async fn run_test(max_query_lengths: HashMap<DataType, u64>, actions: Vec<Action>) {
let p2p_sync_config = P2PSyncClientConfig {
num_headers_per_query: header_max_query_length,
num_block_state_diffs_per_query: STATE_DIFF_QUERY_LENGTH,
num_block_transactions_per_query: TRANSACTION_QUERY_LENGTH,
num_block_classes_per_query: CLASS_DIFF_QUERY_LENGTH,
num_headers_per_query: max_query_lengths.get(&DataType::Header).cloned().unwrap_or(1),
num_block_state_diffs_per_query: max_query_lengths
.get(&DataType::StateDiff)
.cloned()
.unwrap_or(1),
num_block_transactions_per_query: max_query_lengths
.get(&DataType::Transaction)
.cloned()
.unwrap_or(1),
num_block_classes_per_query: max_query_lengths.get(&DataType::Class).cloned().unwrap_or(1),
wait_period_for_new_data: WAIT_PERIOD_FOR_NEW_DATA,
buffer_size: BUFFER_SIZE,
stop_sync_at_block_number: None,
};
let buffer_size = p2p_sync_config.buffer_size;
let ((storage_reader, storage_writer), _temp_dir) = get_test_storage();
let (header_sender, mut mock_header_response_manager) =
let (header_sender, mut mock_header_network) = mock_register_sqmr_protocol_client(buffer_size);
let (state_diff_sender, mut mock_state_diff_network) =
mock_register_sqmr_protocol_client(buffer_size);
let (state_diff_sender, _mock_state_diff_response_manager) =
mock_register_sqmr_protocol_client(buffer_size);
let (transaction_sender, _mock_transaction_response_manager) =
mock_register_sqmr_protocol_client(buffer_size);
let (class_sender, _mock_class_response_manager) =
let (transaction_sender, mut mock_transaction_network) =
mock_register_sqmr_protocol_client(buffer_size);
let (class_sender, mut mock_class_network) = mock_register_sqmr_protocol_client(buffer_size);
let p2p_sync_channels = P2PSyncClientChannels {
header_sender,
state_diff_sender,
Expand All @@ -169,30 +197,93 @@ pub async fn run_test(header_max_query_length: u64, actions: Vec<Action>) {
);

let mut headers_current_query_responses_manager = None;
let mut state_diff_current_query_responses_manager = None;
let mut transaction_current_query_responses_manager = None;
let mut class_current_query_responses_manager = None;

tokio::select! {
_ = async {
for action in actions {
match action {
Action::ReceiveQuery(validate_query_fn) => {
let responses_manager =
mock_header_response_manager.next().await.unwrap();
let query = responses_manager.query().as_ref().unwrap().0.clone();
Action::ReceiveQuery(validate_query_fn, data_type) => {
let query = match data_type {
DataType::Header => {
get_next_query(
&mut mock_header_network,
&mut headers_current_query_responses_manager,
).await.0
}
DataType::StateDiff => {
get_next_query(
&mut mock_state_diff_network,
&mut state_diff_current_query_responses_manager,
).await.0
}
DataType::Transaction => {
get_next_query(
&mut mock_transaction_network,
&mut transaction_current_query_responses_manager,
).await.0
}
DataType::Class => {
get_next_query(
&mut mock_class_network,
&mut class_current_query_responses_manager,
).await.0
}
};
validate_query_fn(query);
headers_current_query_responses_manager = Some(responses_manager);
}
Action::SendHeader(header_or_fin) => {
let responses_manager = headers_current_query_responses_manager.as_mut()
.expect("Called SendHeader without calling ReceiveQuery");
responses_manager.send_response(header_or_fin).await.unwrap();
}
Action::SendStateDiff(state_diff_or_fin) => {
let responses_manager = state_diff_current_query_responses_manager.as_mut()
.expect("Called SendStateDiff without calling ReceiveQuery");
responses_manager.send_response(state_diff_or_fin).await.unwrap();
}
Action::SendTransaction(transaction_or_fin) => {
let responses_manager = transaction_current_query_responses_manager.as_mut()
.expect("Called SendTransaction without calling ReceiveQuery");
responses_manager.send_response(transaction_or_fin).await.unwrap();
}
Action::SendClass(class_or_fin) => {
let responses_manager = class_current_query_responses_manager.as_mut()
.expect("Called SendClass without calling ReceiveQuery");
responses_manager.send_response(class_or_fin).await.unwrap();
}
Action::CheckStorage(check_storage_fn) => {
// We tried avoiding the clone here but it causes lifetime issues.
check_storage_fn(storage_reader.clone()).await;
}
Action::ValidateReportSent => {
Action::ValidateReportSent(DataType::Header) => {
let responses_manager = headers_current_query_responses_manager.take()
.expect("Called ValidateReportSent without calling ReceiveQuery");
.expect(
"Called ValidateReportSent without calling ReceiveQuery on the same
data type");
responses_manager.assert_reported(TIMEOUT_FOR_TEST).await;
}
Action::ValidateReportSent(DataType::StateDiff) => {
let responses_manager = state_diff_current_query_responses_manager.take()
.expect(
"Called ValidateReportSent without calling ReceiveQuery on the same
data type");
responses_manager.assert_reported(TIMEOUT_FOR_TEST).await;
}
Action::ValidateReportSent(DataType::Transaction) => {
let responses_manager = transaction_current_query_responses_manager.take()
.expect(
"Called ValidateReportSent without calling ReceiveQuery on the same
data type");
responses_manager.assert_reported(TIMEOUT_FOR_TEST).await;
}
Action::ValidateReportSent(DataType::Class) => {
let responses_manager = class_current_query_responses_manager.take()
.expect(
"Called ValidateReportSent without calling ReceiveQuery on the same
data type");
responses_manager.assert_reported(TIMEOUT_FOR_TEST).await;
}
}
Expand Down Expand Up @@ -250,18 +341,9 @@ pub fn create_block_hashes_and_signatures(n_blocks: u8) -> Vec<(BlockHash, Block
.collect()
}

pub(crate) enum MarkerKind {
Header,
#[allow(dead_code)]
Body,
State,
#[allow(dead_code)]
Class,
}

// TODO: Consider moving this to storage and to use poll wakeup instead of sleep
pub(crate) async fn wait_for_marker(
marker_kind: MarkerKind,
data_type: DataType,
storage_reader: &StorageReader,
expected_marker: BlockNumber,
sleep_duration: Duration,
Expand All @@ -273,11 +355,11 @@ pub(crate) async fn wait_for_marker(
assert!(start_time.elapsed() < timeout, "Timeout waiting for marker");

let txn = storage_reader.begin_ro_txn().unwrap();
let storage_marker = match marker_kind {
MarkerKind::Header => txn.get_header_marker().unwrap(),
MarkerKind::Body => txn.get_body_marker().unwrap(),
MarkerKind::State => txn.get_state_marker().unwrap(),
MarkerKind::Class => txn.get_class_marker().unwrap(),
let storage_marker = match data_type {
DataType::Header => txn.get_header_marker().unwrap(),
DataType::Transaction => txn.get_body_marker().unwrap(),
DataType::StateDiff => txn.get_state_marker().unwrap(),
DataType::Class => txn.get_class_marker().unwrap(),
};

if storage_marker >= expected_marker {
Expand All @@ -287,3 +369,16 @@ pub(crate) async fn wait_for_marker(
tokio::time::sleep(sleep_duration).await;
}
}

async fn get_next_query<Query: TryFrom<Vec<u8>> + Clone, Response: TryFrom<Vec<u8>>>(
mock_network: &mut GenericReceiver<MockClientResponsesManager<Query, Response>>,
current_query_responses_manager: &mut Option<MockClientResponsesManager<Query, Response>>,
) -> Query
where
<Query as TryFrom<Vec<u8>>>::Error: Debug,
{
let responses_manager = mock_network.next().await.unwrap();
let query = responses_manager.query().as_ref().unwrap().clone();
*current_query_responses_manager = Some(responses_manager);
query
}
6 changes: 3 additions & 3 deletions crates/papyrus_p2p_sync/src/client/transaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::test_utils::{
TRANSACTION_QUERY_LENGTH,
WAIT_PERIOD_FOR_NEW_DATA,
};
use crate::client::test_utils::{wait_for_marker, MarkerKind, TIMEOUT_FOR_TEST};
use crate::client::test_utils::{wait_for_marker, DataType, TIMEOUT_FOR_TEST};

#[tokio::test]
async fn transaction_basic_flow() {
Expand Down Expand Up @@ -79,7 +79,7 @@ async fn transaction_basic_flow() {
}

wait_for_marker(
MarkerKind::Header,
DataType::Header,
&storage_reader,
BlockNumber(HEADER_QUERY_LENGTH),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
Expand Down Expand Up @@ -139,7 +139,7 @@ async fn transaction_basic_flow() {
// sent.
let block_number = BlockNumber(block_number);
wait_for_marker(
MarkerKind::Body,
DataType::Transaction,
&storage_reader,
block_number.unchecked_next(),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
Expand Down

0 comments on commit 63cba88

Please sign in to comment.