Skip to content

Commit

Permalink
chore(papyrus_p2p_sync): add block data receiver to p2psyncclient
Browse files Browse the repository at this point in the history
  • Loading branch information
eitanm-starkware committed Dec 10, 2024
1 parent f92d97a commit fd7a70e
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::StreamExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
use papyrus_common::pending_classes::PendingClasses;
Expand Down Expand Up @@ -304,6 +305,7 @@ async fn spawn_sync_client(
storage_reader,
storage_writer,
p2p_sync_client_channels,
futures::stream::pending().boxed(),
);
tokio::spawn(async move { Ok(p2p_sync.run().await?) })
}
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_p2p_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ papyrus_storage.workspace = true
rand.workspace = true
serde.workspace = true
starknet_api.workspace = true
starknet_state_sync_types.workspace = true
starknet-types-core.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand Down
8 changes: 7 additions & 1 deletion crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::Duration;

use class::ClassStreamBuilder;
use futures::channel::mpsc::SendError;
use futures::stream::BoxStream;
use futures::Stream;
use header::HeaderStreamBuilder;
use papyrus_common::pending_classes::ApiContractClass;
Expand All @@ -40,11 +41,13 @@ use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use starknet_api::core::ClassHash;
use starknet_api::transaction::FullTransaction;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use state_diff::StateDiffStreamBuilder;
use stream_builder::{DataStreamBuilder, DataStreamResult};
use tokio_stream::StreamExt;
use tracing::instrument;
use transaction::TransactionStreamFactory;

const STEP: u64 = 1;
const ALLOWED_SIGNATURES_LENGTH: usize = 1;

Expand Down Expand Up @@ -226,6 +229,8 @@ pub struct P2PSyncClient {
storage_reader: StorageReader,
storage_writer: StorageWriter,
p2p_sync_channels: P2PSyncClientChannels,
#[allow(dead_code)]
internal_blocks_receiver: BoxStream<'static, (SyncBlock, BlockNumber)>,
}

impl P2PSyncClient {
Expand All @@ -234,8 +239,9 @@ impl P2PSyncClient {
storage_reader: StorageReader,
storage_writer: StorageWriter,
p2p_sync_channels: P2PSyncClientChannels,
internal_blocks_receiver: BoxStream<'static, (SyncBlock, BlockNumber)>,
) -> Self {
Self { config, storage_reader, storage_writer, p2p_sync_channels }
Self { config, storage_reader, storage_writer, p2p_sync_channels, internal_blocks_receiver }
}

#[instrument(skip(self), level = "debug", err)]
Expand Down
2 changes: 2 additions & 0 deletions crates/papyrus_p2p_sync/src/client/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub fn setup() -> TestArgs {
storage_reader.clone(),
storage_writer,
p2p_sync_channels,
futures::stream::pending().boxed(),
);
TestArgs {
p2p_sync,
Expand Down Expand Up @@ -194,6 +195,7 @@ pub async fn run_test(max_query_lengths: HashMap<DataType, u64>, actions: Vec<Ac
storage_reader.clone(),
storage_writer,
p2p_sync_channels,
futures::stream::pending().boxed(),
);

let mut headers_current_query_responses_manager = None;
Expand Down
3 changes: 2 additions & 1 deletion crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod test;

use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{FutureExt, StreamExt};
use papyrus_network::network_manager::{self, NetworkError};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientError};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
Expand Down Expand Up @@ -65,6 +65,7 @@ impl StateSyncRunner {
storage_reader.clone(),
storage_writer,
p2p_sync_client_channels,
futures::stream::pending().boxed(),
);

let header_server_receiver = network_manager
Expand Down

0 comments on commit fd7a70e

Please sign in to comment.