Skip to content

Commit

Permalink
feat(papyrus_p2p_sync): handle internal block in datastreambuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
eitanm-starkware committed Dec 17, 2024
1 parent 35d05a0 commit 78a0a6b
Showing 1 changed file with 46 additions and 11 deletions.
57 changes: 46 additions & 11 deletions crates/papyrus_p2p_sync/src/client/stream_builder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::cmp::min;
use std::collections::HashMap;
use std::time::Duration;

use async_stream::stream;
use futures::channel::mpsc::Receiver;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use papyrus_network::network_manager::{ClientResponsesManager, SqmrClientSender};
use papyrus_protobuf::converters::ProtobufConversionError;
use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query};
Expand Down Expand Up @@ -53,10 +54,35 @@ where

fn get_start_block_number(storage_reader: &StorageReader) -> Result<BlockNumber, StorageError>;

fn get_internal_blocks(
internal_blocks_received: &mut HashMap<BlockNumber, Self::Output>,
internal_block_receiver: &mut Option<Receiver<(BlockNumber, Self::Output)>>,
current_block_number: BlockNumber,
) -> Option<Self::Output> {
if let Some(block) = internal_blocks_received.remove(&current_block_number) {
return Some(block);
}
if let Some(internal_block_receiver) = internal_block_receiver {
while let Some((block_number, block_data)) = internal_block_receiver
.next()
.now_or_never()
.map(|now_or_never_res| now_or_never_res.expect("internal block receiver closed"))
{
if block_number >= current_block_number {
if block_number == current_block_number {
return Some(block_data);
}
internal_blocks_received.insert(block_number, block_data);
}
}
}
None
}

fn create_stream<TQuery>(
mut sqmr_sender: SqmrClientSender<TQuery, DataOrFin<InputFromNetwork>>,
storage_reader: StorageReader,
_internal_block_receiver: Option<Receiver<(BlockNumber, Self::Output)>>,
mut internal_block_receiver: Option<Receiver<(BlockNumber, Self::Output)>>,
wait_period_for_new_data: Duration,
num_blocks_per_query: u64,
stop_sync_at_block_number: Option<BlockNumber>,
Expand All @@ -67,6 +93,7 @@ where
{
stream! {
let mut current_block_number = Self::get_start_block_number(&storage_reader)?;
let mut internal_blocks_received = HashMap::new();
'send_query_and_parse_responses: loop {
let limit = match Self::BLOCK_NUMBER_LIMIT {
BlockNumberLimit::Unlimited => num_blocks_per_query,
Expand All @@ -92,6 +119,16 @@ where
current_block_number.0,
end_block_number,
);
let end_block_number = min(end_block_number, stop_sync_at_block_number.map(|block_number| block_number.0).unwrap_or(end_block_number));
while current_block_number.0 < end_block_number {
if let Some(block) = Self::get_internal_blocks(&mut internal_blocks_received, &mut internal_block_receiver, current_block_number)
{
yield Ok(Box::<dyn BlockData>::from(Box::new(block)));
current_block_number = current_block_number.unchecked_next();
} else {
break;
}
}
// TODO(shahak): Use the report callback.
let mut client_response_manager = sqmr_sender
.send_new_query(
Expand All @@ -101,9 +138,7 @@ where
limit,
step: STEP,
})
)
.await?;

).await?;
while current_block_number.0 < end_block_number {
match Self::parse_data_for_block(
&mut client_response_manager, current_block_number, &storage_reader
Expand Down Expand Up @@ -134,12 +169,12 @@ where
}
info!("Added {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number);
current_block_number = current_block_number.unchecked_next();
if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| {
current_block_number >= stop_sync_at_block_number
}) {
info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION);
return;
}
}
if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| {
current_block_number >= stop_sync_at_block_number
}) {
info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION);
return;
}

// Consume the None message signaling the end of the query.
Expand Down

0 comments on commit 78a0a6b

Please sign in to comment.