diff --git a/crates/papyrus_p2p_sync/Cargo.toml b/crates/papyrus_p2p_sync/Cargo.toml index f39a8bc76b5..0121d5cf7e3 100644 --- a/crates/papyrus_p2p_sync/Cargo.toml +++ b/crates/papyrus_p2p_sync/Cargo.toml @@ -19,7 +19,9 @@ papyrus_network.workspace = true papyrus_proc_macros.workspace = true papyrus_protobuf.workspace = true papyrus_storage.workspace = true +papyrus_test_utils.workspace = true rand.workspace = true +rand_chacha.workspace = true serde.workspace = true starknet_api.workspace = true starknet_state_sync_types.workspace = true @@ -35,8 +37,6 @@ lazy_static.workspace = true papyrus_network = { workspace = true, features = ["testing"] } papyrus_protobuf = { workspace = true, features = ["testing"] } papyrus_storage = { workspace = true, features = ["testing"] } -papyrus_test_utils.workspace = true -rand_chacha.workspace = true static_assertions.workspace = true tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index d085683ca96..25fbc674004 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -20,7 +20,7 @@ use std::time::Duration; use class::ClassStreamBuilder; use futures::channel::mpsc::{Receiver, SendError, Sender}; use futures::stream::BoxStream; -use futures::Stream; +use futures::{SinkExt as _, Stream}; use header::HeaderStreamBuilder; use papyrus_common::pending_classes::ApiContractClass; use papyrus_config::converters::deserialize_milliseconds_to_duration; @@ -37,11 +37,20 @@ use papyrus_protobuf::sync::{ TransactionQuery, }; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; +use papyrus_test_utils::{get_rng, GetTestInstance}; use serde::{Deserialize, Serialize}; -use starknet_api::block::{BlockBody, BlockNumber}; +use starknet_api::block::{ + BlockBody, + BlockHash, + BlockHeader, + BlockHeaderWithoutHash, + BlockNumber, + BlockSignature, +}; use starknet_api::core::ClassHash; +use starknet_api::hash::StarkHash; use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses, ThinStateDiff}; -use starknet_api::transaction::FullTransaction; +use starknet_api::transaction::{FullTransaction, Transaction, TransactionOutput}; use starknet_state_sync_types::state_sync_types::SyncBlock; use state_diff::StateDiffStreamBuilder; use stream_builder::{DataStreamBuilder, DataStreamResult}; @@ -231,7 +240,6 @@ pub struct P2PSyncClient { storage_reader: StorageReader, storage_writer: StorageWriter, p2p_sync_channels: P2PSyncClientChannels, - #[allow(dead_code)] internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>, } @@ -247,26 +255,76 @@ impl P2PSyncClient { } #[instrument(skip(self), level = "debug", err)] - pub async fn run(mut self) -> Result<(), P2PSyncClientError> { + pub async fn run(self) -> Result<(), P2PSyncClientError> { let internal_blocks_channels = InternalBlocksChannels::new(); - self.create_internal_blocks_sender_task(internal_blocks_channels.senders); - let mut data_stream = self.p2p_sync_channels.create_stream( - self.storage_reader.clone(), - self.config, + let P2PSyncClient { + config, + storage_reader, + mut storage_writer, + p2p_sync_channels, + internal_blocks_receiver, + } = self; + Self::create_internal_blocks_sender_task( + internal_blocks_channels.senders, + internal_blocks_receiver, + ); + let mut data_stream = p2p_sync_channels.create_stream( + storage_reader, + config, internal_blocks_channels.receivers, ); loop { let data = data_stream.next().await.expect("Sync data stream should never end")?; - data.write_to_storage(&mut self.storage_writer)?; + data.write_to_storage(&mut storage_writer)?; } } fn create_internal_blocks_sender_task( - &self, - #[allow(unused_variables)] internal_blocks_senders: InternalBlocksSenders, - ) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move {}) + internal_blocks_senders: InternalBlocksSenders, + mut internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>, + ) -> tokio::task::JoinHandle> { + tokio::spawn(async move { + loop { + let (block_number, sync_block) = StreamExt::next(&mut internal_blocks_receiver) + .await + .expect("Internal blocks stream should never end"); + let InternalBlocksSenders { + header_sender, + state_diff_sender, + transaction_sender, + class_sender: _, + } = &mut internal_blocks_senders.clone(); + let block_header = SignedBlockHeader { + block_header: BlockHeader { + block_hash: BlockHash(StarkHash::from(block_number.0)), + block_header_without_hash: BlockHeaderWithoutHash { + block_number, + ..Default::default() + }, + state_diff_length: Some(sync_block.state_diff.len()), + n_transactions: sync_block.transaction_hashes.len(), + ..Default::default() + }, + signatures: vec![BlockSignature::default()], + }; + + header_sender.send((block_number, block_header)).await?; + let state_diff = sync_block.state_diff; + state_diff_sender.send((block_number, (state_diff, block_number))).await?; + let num_transactions = sync_block.transaction_hashes.len(); + let mut rng = get_rng(); + let block_body = BlockBody { + transaction_hashes: sync_block.transaction_hashes, + transaction_outputs: vec![ + TransactionOutput::get_test_instance(&mut rng); + num_transactions + ], + transactions: vec![Transaction::get_test_instance(&mut rng); num_transactions], + }; + transaction_sender.send((block_number, (block_body, block_number))).await?; + } + }) } } @@ -279,7 +337,7 @@ struct InternalBlocksReceivers { Receiver<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>, } -#[allow(dead_code)] +#[derive(Clone)] struct InternalBlocksSenders { header_sender: Sender<(BlockNumber, SignedBlockHeader)>, state_diff_sender: Sender<(BlockNumber, (ThinStateDiff, BlockNumber))>,