diff --git a/crates/starknet_state_sync/src/lib.rs b/crates/starknet_state_sync/src/lib.rs index 1f86107806..8c371ca7ef 100644 --- a/crates/starknet_state_sync/src/lib.rs +++ b/crates/starknet_state_sync/src/lib.rs @@ -2,6 +2,8 @@ pub mod config; pub mod runner; use async_trait::async_trait; +use futures::channel::mpsc::{channel, Sender}; +use futures::SinkExt; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::state::StateStorageReader; use papyrus_storage::StorageReader; @@ -12,18 +14,23 @@ use starknet_state_sync_types::communication::{ StateSyncResponse, StateSyncResult, }; +use starknet_state_sync_types::errors::StateSyncError; use starknet_state_sync_types::state_sync_types::SyncBlock; use crate::config::StateSyncConfig; use crate::runner::StateSyncRunner; +const BUFFER_SIZE: usize = 100000; + pub fn create_state_sync_and_runner(config: StateSyncConfig) -> (StateSync, StateSyncRunner) { - let (state_sync_runner, storage_reader) = StateSyncRunner::new(config); - (StateSync { storage_reader }, state_sync_runner) + let (new_block_sender, new_block_receiver) = channel(BUFFER_SIZE); + let (state_sync_runner, storage_reader) = StateSyncRunner::new(config, new_block_receiver); + (StateSync { storage_reader, new_block_sender }, state_sync_runner) } pub struct StateSync { storage_reader: StorageReader, + new_block_sender: Sender<(BlockNumber, SyncBlock)>, } // TODO(shahak): Have StateSyncRunner call StateSync instead of the opposite once we stop supporting @@ -35,8 +42,13 @@ impl ComponentRequestHandler for StateSync StateSyncRequest::GetBlock(block_number) => { StateSyncResponse::GetBlock(self.get_block(block_number)) } - StateSyncRequest::AddNewBlock(_block_number, _sync_block) => { - todo!() + StateSyncRequest::AddNewBlock(block_number, sync_block) => { + StateSyncResponse::AddNewBlock( + self.new_block_sender + .send((block_number, sync_block)) + .await + .map_err(|_| StateSyncError::P2PSyncClientError), + ) } } } diff --git a/crates/starknet_state_sync/src/runner/mod.rs b/crates/starknet_state_sync/src/runner/mod.rs index 6e333adcff..7733d42b8a 100644 --- a/crates/starknet_state_sync/src/runner/mod.rs +++ b/crates/starknet_state_sync/src/runner/mod.rs @@ -2,15 +2,18 @@ mod test; use async_trait::async_trait; +use futures::channel::mpsc::Receiver; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use papyrus_network::network_manager::{self, NetworkError}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientError}; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; use papyrus_storage::{open_storage, StorageReader}; +use starknet_api::block::BlockNumber; use starknet_sequencer_infra::component_definitions::ComponentStarter; use starknet_sequencer_infra::errors::ComponentError; +use starknet_state_sync_types::state_sync_types::SyncBlock; use crate::config::StateSyncConfig; @@ -37,7 +40,10 @@ impl ComponentStarter for StateSyncRunner { } impl StateSyncRunner { - pub fn new(config: StateSyncConfig) -> (Self, StorageReader) { + pub fn new( + config: StateSyncConfig, + new_block_receiver: Receiver<(BlockNumber, SyncBlock)>, + ) -> (Self, StorageReader) { let (storage_reader, storage_writer) = open_storage(config.storage_config).expect("StateSyncRunner failed opening storage"); @@ -65,7 +71,7 @@ impl StateSyncRunner { storage_reader.clone(), storage_writer, p2p_sync_client_channels, - futures::stream::pending().boxed(), + Box::pin(new_block_receiver), ); let header_server_receiver = network_manager diff --git a/crates/starknet_state_sync_types/src/errors.rs b/crates/starknet_state_sync_types/src/errors.rs index f5842256ac..f37d039aa9 100644 --- a/crates/starknet_state_sync_types/src/errors.rs +++ b/crates/starknet_state_sync_types/src/errors.rs @@ -10,6 +10,8 @@ pub enum StateSyncError { // We put the string of the error instead. #[error("Unexpected storage error: {0}")] StorageError(String), + #[error("Failed to send message to P2pSyncClient")] + P2PSyncClientError, } impl From for StateSyncError {