diff --git a/crates/papyrus_p2p_sync/src/client/stream_builder.rs b/crates/papyrus_p2p_sync/src/client/stream_builder.rs index 36718a939d..257bcb2cd0 100644 --- a/crates/papyrus_p2p_sync/src/client/stream_builder.rs +++ b/crates/papyrus_p2p_sync/src/client/stream_builder.rs @@ -1,11 +1,12 @@ use std::cmp::min; +use std::collections::HashMap; use std::time::Duration; use async_stream::stream; use futures::channel::mpsc::Receiver; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use papyrus_network::network_manager::{ClientResponsesManager, SqmrClientSender}; use papyrus_protobuf::converters::ProtobufConversionError; use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query}; @@ -53,10 +54,35 @@ where fn get_start_block_number(storage_reader: &StorageReader) -> Result; + fn get_internal_blocks( + internal_blocks_received: &mut HashMap, + internal_block_receiver: &mut Option>, + current_block_number: BlockNumber, + ) -> Option { + if let Some(block) = internal_blocks_received.remove(¤t_block_number) { + return Some(block); + } + if let Some(internal_block_receiver) = internal_block_receiver { + while let Some((block_number, block_data)) = internal_block_receiver + .next() + .now_or_never() + .map(|now_or_never_res| now_or_never_res.expect("internal block receiver closed")) + { + if block_number >= current_block_number { + if block_number == current_block_number { + return Some(block_data); + } + internal_blocks_received.insert(block_number, block_data); + } + } + } + None + } + fn create_stream( mut sqmr_sender: SqmrClientSender>, storage_reader: StorageReader, - _internal_block_receiver: Option>, + mut internal_block_receiver: Option>, wait_period_for_new_data: Duration, num_blocks_per_query: u64, stop_sync_at_block_number: Option, @@ -67,6 +93,7 @@ where { stream! { let mut current_block_number = Self::get_start_block_number(&storage_reader)?; + let mut internal_blocks_received = HashMap::new(); 'send_query_and_parse_responses: loop { let limit = match Self::BLOCK_NUMBER_LIMIT { BlockNumberLimit::Unlimited => num_blocks_per_query, @@ -92,6 +119,16 @@ where current_block_number.0, end_block_number, ); + let end_block_number = min(end_block_number, stop_sync_at_block_number.map(|block_number| block_number.0).unwrap_or(end_block_number)); + while current_block_number.0 < end_block_number { + if let Some(block) = Self::get_internal_blocks(&mut internal_blocks_received, &mut internal_block_receiver, current_block_number) + { + yield Ok(Box::::from(Box::new(block))); + current_block_number = current_block_number.unchecked_next(); + } else { + break; + } + } // TODO(shahak): Use the report callback. let mut client_response_manager = sqmr_sender .send_new_query( @@ -101,9 +138,7 @@ where limit, step: STEP, }) - ) - .await?; - + ).await?; while current_block_number.0 < end_block_number { match Self::parse_data_for_block( &mut client_response_manager, current_block_number, &storage_reader @@ -134,12 +169,12 @@ where } info!("Added {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number); current_block_number = current_block_number.unchecked_next(); - if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| { - current_block_number >= stop_sync_at_block_number - }) { - info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION); - return; - } + } + if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| { + current_block_number >= stop_sync_at_block_number + }) { + info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION); + return; } // Consume the None message signaling the end of the query.