Skip to content

Commit

Permalink
feat(papyrus_p2p_sync): handle internal block in dsb
Browse files Browse the repository at this point in the history
  • Loading branch information
eitanm-starkware committed Dec 9, 2024
1 parent f9a0fb1 commit e9291c5
Showing 1 changed file with 25 additions and 11 deletions.
36 changes: 25 additions & 11 deletions crates/papyrus_p2p_sync/src/client/stream_builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cmp::min;
use std::collections::HashMap;
use std::time::Duration;

use async_stream::stream;
Expand Down Expand Up @@ -56,7 +57,7 @@ where
fn create_stream<TQuery>(
mut sqmr_sender: SqmrClientSender<TQuery, DataOrFin<InputFromNetwork>>,
storage_reader: StorageReader,
_internal_block_receiver: Option<Receiver<(BlockNumber, Self::Output)>>,
mut internal_block_receiver: Option<Receiver<(BlockNumber, Self::Output)>>,
wait_period_for_new_data: Duration,
num_blocks_per_query: u64,
stop_sync_at_block_number: Option<BlockNumber>,
Expand All @@ -67,6 +68,7 @@ where
{
stream! {
let mut current_block_number = Self::get_start_block_number(&storage_reader)?;
let mut internal_blocks_received = HashMap::new();
'send_query_and_parse_responses: loop {
let limit = match Self::BLOCK_NUMBER_LIMIT {
BlockNumberLimit::Unlimited => num_blocks_per_query,
Expand All @@ -93,18 +95,30 @@ where
end_block_number,
);
// TODO(shahak): Use the report callback.
let mut client_response_manager = sqmr_sender
let client_response_manager = sqmr_sender
.send_new_query(
TQuery::from(Query {
start_block: BlockHashOrNumber::Number(current_block_number),
direction: Direction::Forward,
limit,
step: STEP,
})
)
.await?;

);
if internal_block_receiver.is_some() {
while let Some((block_number, block_data)) = internal_block_receiver.as_mut().unwrap().next().await {
if block_number >= current_block_number {
internal_blocks_received.insert(block_number, block_data);
}
}
}
let mut client_response_manager = client_response_manager.await?;
let end_block_number = min(end_block_number, stop_sync_at_block_number.map(|block_number| block_number.0).unwrap_or(end_block_number));
while current_block_number.0 < end_block_number {
if let Some(block_data) = internal_blocks_received.remove(&current_block_number) {
yield Ok(Box::<dyn BlockData>::from(Box::new(block_data)));
current_block_number = current_block_number.unchecked_next();
continue;
}
match Self::parse_data_for_block(
&mut client_response_manager, current_block_number, &storage_reader
).await {
Expand Down Expand Up @@ -134,12 +148,12 @@ where
}
info!("Added {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number);
current_block_number = current_block_number.unchecked_next();
if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| {
current_block_number >= stop_sync_at_block_number
}) {
info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION);
return;
}
}
if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| {
current_block_number >= stop_sync_at_block_number
}) {
info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION);
return;
}

// Consume the None message signaling the end of the query.
Expand Down

0 comments on commit e9291c5

Please sign in to comment.