Skip to content

Commit

Permalink
refactor(papyrus_p2p_sync): use run_test in rest of header tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Dec 17, 2024
1 parent 84a2a72 commit 4f5dd28
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 178 deletions.
325 changes: 149 additions & 176 deletions crates/papyrus_p2p_sync/src/client/header_test.rs
Original file line number Diff line number Diff line change
@@ -1,201 +1,137 @@
use std::collections::HashMap;

use futures::{FutureExt, StreamExt};
use papyrus_protobuf::sync::{
BlockHashOrNumber,
DataOrFin,
Direction,
HeaderQuery,
Query,
SignedBlockHeader,
};
use futures::future::BoxFuture;
use futures::FutureExt;
use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query, SignedBlockHeader};
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::StorageReader;
use papyrus_test_utils::get_rng;
use starknet_api::block::{BlockHeader, BlockHeaderWithoutHash, BlockNumber};
use tokio::time::timeout;
use starknet_api::block::BlockNumber;

use super::test_utils::{
create_block_hashes_and_signatures,
random_header,
run_test,
setup,
wait_for_marker,
Action,
DataType,
TestArgs,
HEADER_QUERY_LENGTH,
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE,
TIMEOUT_FOR_TEST,
WAIT_PERIOD_FOR_NEW_DATA,
};

#[tokio::test]
async fn signed_headers_basic_flow() {
const NUM_QUERIES: u64 = 3;

let TestArgs {
p2p_sync,
storage_reader,
mut mock_header_response_manager,
// The test will fail if we drop these
mock_state_diff_response_manager: _mock_state_diff_response_manager,
mock_transaction_response_manager: _mock_transaction_response_manager,
mock_class_response_manager: _mock_class_response_manager,
..
} = setup();
let block_hashes_and_signatures =
create_block_hashes_and_signatures((NUM_QUERIES * HEADER_QUERY_LENGTH).try_into().unwrap());

// Create a future that will receive queries, send responses and validate the results.
let parse_queries_future = async move {
for query_index in 0..NUM_QUERIES {
let start_block_number = query_index * HEADER_QUERY_LENGTH;
let end_block_number = (query_index + 1) * HEADER_QUERY_LENGTH;

// Receive query and validate it.
let mut mock_header_responses_manager =
mock_header_response_manager.next().await.unwrap();
assert_eq!(
*mock_header_responses_manager.query(),
Ok(HeaderQuery(Query {
start_block: BlockHashOrNumber::Number(BlockNumber(start_block_number)),
direction: Direction::Forward,
limit: HEADER_QUERY_LENGTH,
step: 1,
}))
);

for (i, (block_hash, block_signature)) in block_hashes_and_signatures
.iter()
.enumerate()
.take(end_block_number.try_into().expect("Failed converting u64 to usize"))
.skip(start_block_number.try_into().expect("Failed converting u64 to usize"))
{
// Send responses
mock_header_responses_manager
.send_response(DataOrFin(Some(SignedBlockHeader {
block_header: BlockHeader {
block_hash: *block_hash,
block_header_without_hash: BlockHeaderWithoutHash {
block_number: BlockNumber(i.try_into().unwrap()),
..Default::default()
},
state_diff_length: Some(0),
..Default::default()
},
signatures: vec![*block_signature],
})))
.await
.unwrap();

// Check responses were written to the storage. This way we make sure that the sync
// writes to the storage each response it receives before all query responses were
// sent.
let block_number = BlockNumber(i.try_into().unwrap());
wait_for_marker(
DataType::Header,
&storage_reader,
block_number.unchecked_next(),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
TIMEOUT_FOR_TEST,
)
.await;

let txn = storage_reader.begin_ro_txn().unwrap();
let block_header = txn.get_block_header(block_number).unwrap().unwrap();
assert_eq!(block_number, block_header.block_header_without_hash.block_number);
assert_eq!(*block_hash, block_header.block_hash);
let actual_block_signature =
txn.get_block_signature(block_number).unwrap().unwrap();
assert_eq!(*block_signature, actual_block_signature);
}
mock_header_responses_manager.send_response(DataOrFin(None)).await.unwrap();
}
};

tokio::select! {
sync_result = p2p_sync.run() => {
sync_result.unwrap();
panic!("P2P sync aborted with no failure.");
}
_ = parse_queries_future => {}
}
let (headers, check_storage_funcs) = create_headers_and_check_storage_funcs(4);
run_test(
HashMap::from([(DataType::Header, 2)]),
vec![
Action::ReceiveQuery(
Box::new(|query| {
assert_eq!(
query,
Query {
start_block: BlockHashOrNumber::Number(BlockNumber(0)),
direction: Direction::Forward,
limit: 2,
step: 1,
}
)
}),
DataType::Header,
),
Action::SendHeader(DataOrFin(Some(headers[0].clone()))),
// We check the storage now to see that the sync writes each response before it parses
// the next one.
Action::CheckStorage(check_storage_funcs[0].clone()),
Action::SendHeader(DataOrFin(Some(headers[1].clone()))),
Action::SendHeader(DataOrFin(None)),
// We check the storage now to see that the sync writes each response before it parses
// the next one.
Action::CheckStorage(check_storage_funcs[1].clone()),
Action::ReceiveQuery(
Box::new(|query| {
assert_eq!(
query,
Query {
start_block: BlockHashOrNumber::Number(BlockNumber(2)),
direction: Direction::Forward,
limit: 2,
step: 1,
}
)
}),
DataType::Header,
),
Action::SendHeader(DataOrFin(Some(headers[2].clone()))),
// We check the storage now to see that the sync writes each response before it parses
// the next one.
Action::CheckStorage(check_storage_funcs[2].clone()),
Action::SendHeader(DataOrFin(Some(headers[3].clone()))),
// We check the storage now to see that the sync writes each response before it parses
// the next one.
Action::CheckStorage(check_storage_funcs[3].clone()),
Action::SendHeader(DataOrFin(None)),
],
)
.await;
}

#[tokio::test]
async fn sync_sends_new_header_query_if_it_got_partial_responses() {
const NUM_ACTUAL_RESPONSES: u8 = 2;
assert!(u64::from(NUM_ACTUAL_RESPONSES) < HEADER_QUERY_LENGTH);

let TestArgs {
p2p_sync,
mut mock_header_response_manager,
// The test will fail if we drop these
mock_state_diff_response_manager: _state_diff_receiver,
mock_transaction_response_manager: _transaction_receiver,
mock_class_response_manager: _class_receiver,
..
} = setup();
let block_hashes_and_signatures = create_block_hashes_and_signatures(NUM_ACTUAL_RESPONSES);

// Create a future that will receive a query, send partial responses and receive the next query.
let parse_queries_future = async move {
let mut mock_header_responses_manager = mock_header_response_manager.next().await.unwrap();

for (i, (block_hash, signature)) in block_hashes_and_signatures.into_iter().enumerate() {
mock_header_responses_manager
.send_response(DataOrFin(Some(SignedBlockHeader {
block_header: BlockHeader {
block_hash,
block_header_without_hash: BlockHeaderWithoutHash {
block_number: BlockNumber(i.try_into().unwrap()),
..Default::default()
},
state_diff_length: Some(0),
..Default::default()
},
signatures: vec![signature],
})))
.await
.unwrap();
}
mock_header_responses_manager.send_response(DataOrFin(None)).await.unwrap();

// Wait for the sync to enter sleep due to partial responses. Then, simulate time has
// passed.
tokio::time::sleep(SLEEP_DURATION_TO_LET_SYNC_ADVANCE).await;
tokio::time::pause();
tokio::time::advance(WAIT_PERIOD_FOR_NEW_DATA).await;
tokio::time::resume();

// First unwrap is for the timeout. Second unwrap is for the Option returned from Stream.
let mock_header_responses_manager = timeout(
TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE,
mock_header_response_manager.next(),
)
.await
.unwrap()
.unwrap();

assert_eq!(
*mock_header_responses_manager.query(),
Ok(HeaderQuery(Query {
start_block: BlockHashOrNumber::Number(BlockNumber(NUM_ACTUAL_RESPONSES.into())),
direction: Direction::Forward,
limit: HEADER_QUERY_LENGTH,
step: 1,
}))
);
};

tokio::select! {
sync_result = p2p_sync.run() => {
sync_result.unwrap();
panic!("P2P sync aborted with no failure.");
}
_ = parse_queries_future => {}
}
let (headers, check_storage_funcs) = create_headers_and_check_storage_funcs(3);
run_test(
HashMap::from([(DataType::Header, 2)]),
vec![
Action::ReceiveQuery(
Box::new(|query| {
assert_eq!(
query,
Query {
start_block: BlockHashOrNumber::Number(BlockNumber(0)),
direction: Direction::Forward,
limit: 2,
step: 1,
}
)
}),
DataType::Header,
),
Action::SendHeader(DataOrFin(Some(headers[0].clone()))),
Action::CheckStorage(check_storage_funcs[0].clone()),
Action::SendHeader(DataOrFin(None)),
// Wait for the sync to enter sleep due to partial responses. Then, simulate time has
// passed.
Action::CheckStorage(Box::new(|_reader| {
async move {
tokio::time::sleep(SLEEP_DURATION_TO_LET_SYNC_ADVANCE).await;
tokio::time::pause();
tokio::time::advance(WAIT_PERIOD_FOR_NEW_DATA).await;
tokio::time::resume();
}
.boxed()
})),
Action::ReceiveQuery(
Box::new(|query| {
assert_eq!(
query,
Query {
start_block: BlockHashOrNumber::Number(BlockNumber(1)),
direction: Direction::Forward,
limit: 2,
step: 1,
}
)
}),
DataType::Header,
),
Action::SendHeader(DataOrFin(Some(headers[1].clone()))),
Action::CheckStorage(check_storage_funcs[1].clone()),
Action::SendHeader(DataOrFin(Some(headers[2].clone()))),
Action::CheckStorage(check_storage_funcs[2].clone()),
Action::SendHeader(DataOrFin(None)),
],
)
.await;
}

#[tokio::test]
Expand Down Expand Up @@ -224,3 +160,40 @@ async fn wrong_block_number() {
}

// TODO(shahak): Add more negative tests.

fn create_headers_and_check_storage_funcs(
num_headers: usize,
) -> (Vec<SignedBlockHeader>, Vec<Box<impl FnOnce(StorageReader) -> BoxFuture<'static, ()> + Clone>>)
{
let headers = (0..num_headers)
.map(|i| random_header(&mut get_rng(), BlockNumber(i.try_into().unwrap()), None, None))
.collect::<Vec<_>>();

let check_storage_funcs = headers
.iter()
.cloned()
.enumerate()
.map(|(i, expected_header)| {
Box::new(move |storage_reader| {
async move {
let block_number = BlockNumber(i.try_into().unwrap());
wait_for_marker(
DataType::Header,
&storage_reader,
block_number.unchecked_next(),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
TIMEOUT_FOR_TEST,
)
.await;

let txn = storage_reader.begin_ro_txn().unwrap();
let actual_header = txn.get_block_header(block_number).unwrap().unwrap();
assert_eq!(actual_header, expected_header.block_header);
}
.boxed()
})
})
.collect::<Vec<_>>();

(headers, check_storage_funcs)
}
2 changes: 0 additions & 2 deletions crates/papyrus_p2p_sync/src/client/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ pub const CLASS_DIFF_QUERY_LENGTH: u64 = 3;
pub const TRANSACTION_QUERY_LENGTH: u64 = 3;
pub const SLEEP_DURATION_TO_LET_SYNC_ADVANCE: Duration = Duration::from_millis(10);
pub const WAIT_PERIOD_FOR_NEW_DATA: Duration = Duration::from_secs(1);
pub const TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE: Duration =
WAIT_PERIOD_FOR_NEW_DATA.saturating_add(Duration::from_secs(1));

lazy_static! {
static ref TEST_CONFIG: P2PSyncClientConfig = P2PSyncClientConfig {
Expand Down

0 comments on commit 4f5dd28

Please sign in to comment.