From 68c29cf67e45bc8117a3646f2d3467a2a3f29a4d Mon Sep 17 00:00:00 2001 From: eitanm-starkware Date: Mon, 9 Dec 2024 15:26:35 +0200 Subject: [PATCH] feat(papyrus_p2p_network): handle internal block in dsb --- .../src/client/stream_builder.rs | 36 +++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/crates/papyrus_p2p_sync/src/client/stream_builder.rs b/crates/papyrus_p2p_sync/src/client/stream_builder.rs index 36718a939d..7291b83970 100644 --- a/crates/papyrus_p2p_sync/src/client/stream_builder.rs +++ b/crates/papyrus_p2p_sync/src/client/stream_builder.rs @@ -1,4 +1,5 @@ use std::cmp::min; +use std::collections::HashMap; use std::time::Duration; use async_stream::stream; @@ -56,7 +57,7 @@ where fn create_stream( mut sqmr_sender: SqmrClientSender>, storage_reader: StorageReader, - _internal_block_receiver: Option>, + mut internal_block_receiver: Option>, wait_period_for_new_data: Duration, num_blocks_per_query: u64, stop_sync_at_block_number: Option, @@ -67,6 +68,7 @@ where { stream! { let mut current_block_number = Self::get_start_block_number(&storage_reader)?; + let mut internal_blocks_received = HashMap::new(); 'send_query_and_parse_responses: loop { let limit = match Self::BLOCK_NUMBER_LIMIT { BlockNumberLimit::Unlimited => num_blocks_per_query, @@ -93,7 +95,7 @@ where end_block_number, ); // TODO(shahak): Use the report callback. - let mut client_response_manager = sqmr_sender + let client_response_manager = sqmr_sender .send_new_query( TQuery::from(Query { start_block: BlockHashOrNumber::Number(current_block_number), @@ -101,10 +103,22 @@ where limit, step: STEP, }) - ) - .await?; - + ); + if internal_block_receiver.is_some() { + while let Some((block_number, block_data)) = internal_block_receiver.as_mut().unwrap().next().await { + if block_number >= current_block_number { + internal_blocks_received.insert(block_number, block_data); + } + } + } + let mut client_response_manager = client_response_manager.await?; + let end_block_number = min(end_block_number, stop_sync_at_block_number.map(|block_number| block_number.0).unwrap_or(end_block_number)); while current_block_number.0 < end_block_number { + if let Some(block_data) = internal_blocks_received.remove(¤t_block_number) { + yield Ok(Box::::from(Box::new(block_data))); + current_block_number = current_block_number.unchecked_next(); + continue; + } match Self::parse_data_for_block( &mut client_response_manager, current_block_number, &storage_reader ).await { @@ -134,12 +148,12 @@ where } info!("Added {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number); current_block_number = current_block_number.unchecked_next(); - if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| { - current_block_number >= stop_sync_at_block_number - }) { - info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION); - return; - } + } + if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| { + current_block_number >= stop_sync_at_block_number + }) { + info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION); + return; } // Consume the None message signaling the end of the query.