diff --git a/crates/papyrus_p2p_sync/src/client/header_test.rs b/crates/papyrus_p2p_sync/src/client/header_test.rs index d569107d49..7a4b6961ce 100644 --- a/crates/papyrus_p2p_sync/src/client/header_test.rs +++ b/crates/papyrus_p2p_sync/src/client/header_test.rs @@ -14,10 +14,13 @@ use tokio::time::timeout; use super::test_utils::{ create_block_hashes_and_signatures, setup, + wait_for_marker, + MarkerKind, TestArgs, HEADER_QUERY_LENGTH, SLEEP_DURATION_TO_LET_SYNC_ADVANCE, TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE, + TIMEOUT_FOR_TEST, WAIT_PERIOD_FOR_NEW_DATA, }; @@ -80,14 +83,20 @@ async fn signed_headers_basic_flow() { .await .unwrap(); - tokio::time::sleep(SLEEP_DURATION_TO_LET_SYNC_ADVANCE).await; - // Check responses were written to the storage. This way we make sure that the sync // writes to the storage each response it receives before all query responses were // sent. let block_number = BlockNumber(i.try_into().unwrap()); + wait_for_marker( + MarkerKind::Header, + &storage_reader, + block_number.unchecked_next(), + SLEEP_DURATION_TO_LET_SYNC_ADVANCE, + TIMEOUT_FOR_TEST, + ) + .await; + let txn = storage_reader.begin_ro_txn().unwrap(); - assert_eq!(block_number.unchecked_next(), txn.get_header_marker().unwrap()); let block_header = txn.get_block_header(block_number).unwrap().unwrap(); assert_eq!(block_number, block_header.block_header_without_hash.block_number); assert_eq!(*block_hash, block_header.block_hash); diff --git a/crates/papyrus_p2p_sync/src/client/state_diff_test.rs b/crates/papyrus_p2p_sync/src/client/state_diff_test.rs index 2a321570e7..422e03a8b1 100644 --- a/crates/papyrus_p2p_sync/src/client/state_diff_test.rs +++ b/crates/papyrus_p2p_sync/src/client/state_diff_test.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use futures::{FutureExt, StreamExt}; use indexmap::indexmap; use papyrus_protobuf::sync::{ @@ -26,16 +24,17 @@ use static_assertions::const_assert; use super::test_utils::{ create_block_hashes_and_signatures, setup, + wait_for_marker, + MarkerKind, TestArgs, HEADER_QUERY_LENGTH, SLEEP_DURATION_TO_LET_SYNC_ADVANCE, STATE_DIFF_QUERY_LENGTH, + TIMEOUT_FOR_TEST, WAIT_PERIOD_FOR_NEW_DATA, }; use super::StateDiffQuery; -const TIMEOUT_FOR_TEST: Duration = Duration::from_secs(5); - #[tokio::test] async fn state_diff_basic_flow() { // Asserting the constants so the test can assume there will be 2 state diff queries for a @@ -136,15 +135,22 @@ async fn state_diff_basic_flow() { .await .unwrap(); - tokio::time::sleep(SLEEP_DURATION_TO_LET_SYNC_ADVANCE).await; - // Check state diff was written to the storage. This way we make sure that the sync // writes to the storage each block's state diff before receiving all query // responses. - let txn = storage_reader.begin_ro_txn().unwrap(); - assert_eq!(block_number.unchecked_next(), txn.get_state_marker().unwrap()); + wait_for_marker( + MarkerKind::State, + &storage_reader, + block_number.unchecked_next(), + SLEEP_DURATION_TO_LET_SYNC_ADVANCE, + TIMEOUT_FOR_TEST, + ) + .await; + let txn = storage_reader.begin_ro_txn().unwrap(); let state_diff = txn.get_state_diff(block_number).unwrap().unwrap(); + // TODO(noamsp): refactor test so that we treat multiple state diff chunks as a + // single state diff let expected_state_diff = match state_diff_chunk { StateDiffChunk::ContractDiff(contract_diff) => { let mut deployed_contracts = indexmap! {}; diff --git a/crates/papyrus_p2p_sync/src/client/test_utils.rs b/crates/papyrus_p2p_sync/src/client/test_utils.rs index a201255f87..96b8842208 100644 --- a/crates/papyrus_p2p_sync/src/client/test_utils.rs +++ b/crates/papyrus_p2p_sync/src/client/test_utils.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; use lazy_static::lazy_static; use papyrus_common::pending_classes::ApiContractClass; @@ -16,9 +16,13 @@ use papyrus_protobuf::sync::{ StateDiffQuery, TransactionQuery, }; +use papyrus_storage::body::BodyStorageReader; +use papyrus_storage::class::ClassStorageReader; +use papyrus_storage::header::HeaderStorageReader; +use papyrus_storage::state::StateStorageReader; use papyrus_storage::test_utils::get_test_storage; use papyrus_storage::StorageReader; -use starknet_api::block::{BlockHash, BlockSignature}; +use starknet_api::block::{BlockHash, BlockNumber, BlockSignature}; use starknet_api::core::ClassHash; use starknet_api::crypto::utils::Signature; use starknet_api::hash::StarkHash; @@ -27,6 +31,7 @@ use starknet_types_core::felt::Felt; use super::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientConfig}; +pub(crate) const TIMEOUT_FOR_TEST: Duration = Duration::from_secs(5); pub const BUFFER_SIZE: usize = 1000; pub const HEADER_QUERY_LENGTH: u64 = 5; pub const STATE_DIFF_QUERY_LENGTH: u64 = 3; @@ -117,3 +122,41 @@ pub fn create_block_hashes_and_signatures(n_blocks: u8) -> Vec<(BlockHash, Block }) .collect() } + +pub(crate) enum MarkerKind { + Header, + #[allow(dead_code)] + Body, + State, + #[allow(dead_code)] + Class, +} + +// TODO: Consider moving this to storage and to use poll wakeup instead of sleep +pub(crate) async fn wait_for_marker( + marker_kind: MarkerKind, + storage_reader: &StorageReader, + expected_marker: BlockNumber, + sleep_duration: Duration, + timeout: Duration, +) { + let start_time = Instant::now(); + + loop { + assert!(start_time.elapsed() < timeout, "Timeout waiting for marker"); + + let txn = storage_reader.begin_ro_txn().unwrap(); + let storage_marker = match marker_kind { + MarkerKind::Header => txn.get_header_marker().unwrap(), + MarkerKind::Body => txn.get_body_marker().unwrap(), + MarkerKind::State => txn.get_state_marker().unwrap(), + MarkerKind::Class => txn.get_class_marker().unwrap(), + }; + + if storage_marker >= expected_marker { + break; + } + + tokio::time::sleep(sleep_duration).await; + } +}