Skip to content

Commit

Permalink
chore(papyrus_p2p_sync): create internal blocks senders and receivers…
Browse files Browse the repository at this point in the history
… structs
  • Loading branch information
eitanm-starkware committed Dec 10, 2024
1 parent 80cb990 commit 5b59b7d
Showing 1 changed file with 69 additions and 8 deletions.
77 changes: 69 additions & 8 deletions crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::BTreeMap;
use std::time::Duration;

use class::ClassStreamBuilder;
use futures::channel::mpsc::SendError;
use futures::channel::mpsc::{Receiver, SendError, Sender};
use futures::stream::BoxStream;
use futures::Stream;
use header::HeaderStreamBuilder;
Expand All @@ -38,8 +38,9 @@ use papyrus_protobuf::sync::{
};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use starknet_api::block::{BlockBody, BlockNumber};
use starknet_api::core::ClassHash;
use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses, ThinStateDiff};
use starknet_api::transaction::FullTransaction;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use state_diff::StateDiffStreamBuilder;
Expand Down Expand Up @@ -179,15 +180,16 @@ impl P2PSyncClientChannels {
) -> Self {
Self { header_sender, state_diff_sender, transaction_sender, class_sender }
}
pub(crate) fn create_stream(
fn create_stream(
self,
storage_reader: StorageReader,
config: P2PSyncClientConfig,
internal_blocks_receivers: InternalBlocksReceivers,
) -> impl Stream<Item = DataStreamResult> + Send + 'static {
let header_stream = HeaderStreamBuilder::create_stream(
self.header_sender,
storage_reader.clone(),
None,
Some(internal_blocks_receivers.header_receiver),
config.wait_period_for_new_data,
config.num_headers_per_query,
config.stop_sync_at_block_number,
Expand All @@ -196,7 +198,7 @@ impl P2PSyncClientChannels {
let state_diff_stream = StateDiffStreamBuilder::create_stream(
self.state_diff_sender,
storage_reader.clone(),
None,
Some(internal_blocks_receivers.state_diff_receiver),
config.wait_period_for_new_data,
config.num_block_state_diffs_per_query,
config.stop_sync_at_block_number,
Expand All @@ -205,7 +207,7 @@ impl P2PSyncClientChannels {
let transaction_stream = TransactionStreamFactory::create_stream(
self.transaction_sender,
storage_reader.clone(),
None,
Some(internal_blocks_receivers.transaction_receiver),
config.wait_period_for_new_data,
config.num_block_transactions_per_query,
config.stop_sync_at_block_number,
Expand Down Expand Up @@ -246,12 +248,71 @@ impl P2PSyncClient {

#[instrument(skip(self), level = "debug", err)]
pub async fn run(mut self) -> Result<(), P2PSyncClientError> {
let mut data_stream =
self.p2p_sync_channels.create_stream(self.storage_reader.clone(), self.config);
let internal_blocks_channels = InternalBlocksChannels::new();
self.create_internal_blocks_sender_task(internal_blocks_channels.senders);
let mut data_stream = self.p2p_sync_channels.create_stream(
self.storage_reader.clone(),
self.config,
internal_blocks_channels.receivers,
);

loop {
let data = data_stream.next().await.expect("Sync data stream should never end")?;
data.write_to_storage(&mut self.storage_writer)?;
}
}

fn create_internal_blocks_sender_task(
&self,
#[allow(unused_variables)] internal_blocks_senders: InternalBlocksSenders,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {})
}
}

struct InternalBlocksReceivers {
header_receiver: Receiver<(BlockNumber, SignedBlockHeader)>,
state_diff_receiver: Receiver<(BlockNumber, (ThinStateDiff, BlockNumber))>,
transaction_receiver: Receiver<(BlockNumber, (BlockBody, BlockNumber))>,
#[allow(dead_code)]
class_receiver:
Receiver<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>,
}

#[allow(dead_code)]
struct InternalBlocksSenders {
header_sender: Sender<(BlockNumber, SignedBlockHeader)>,
state_diff_sender: Sender<(BlockNumber, (ThinStateDiff, BlockNumber))>,
transaction_sender: Sender<(BlockNumber, (BlockBody, BlockNumber))>,
#[allow(dead_code)]
class_sender: Sender<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>,
}

struct InternalBlocksChannels {
receivers: InternalBlocksReceivers,
senders: InternalBlocksSenders,
}

impl InternalBlocksChannels {
pub fn new() -> Self {
let (header_sender, header_receiver) = futures::channel::mpsc::channel(100);
let (state_diff_sender, state_diff_receiver) = futures::channel::mpsc::channel(100);
let (transaction_sender, transaction_receiver) = futures::channel::mpsc::channel(100);
let (class_sender, class_receiver) = futures::channel::mpsc::channel(100);

Self {
receivers: InternalBlocksReceivers {
header_receiver,
state_diff_receiver,
transaction_receiver,
class_receiver,
},
senders: InternalBlocksSenders {
header_sender,
state_diff_sender,
transaction_sender,
class_sender,
},
}
}
}

0 comments on commit 5b59b7d

Please sign in to comment.