Skip to content

Commit

Permalink
feat(papyrus_p2p_sync): impl p2psyncclient internal block forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
eitanm-starkware committed Dec 15, 2024
1 parent 5b59b7d commit b4e3e7f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 17 deletions.
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ papyrus_network.workspace = true
papyrus_proc_macros.workspace = true
papyrus_protobuf.workspace = true
papyrus_storage.workspace = true
papyrus_test_utils.workspace = true
rand.workspace = true
rand_chacha.workspace = true
serde.workspace = true
starknet_api.workspace = true
starknet_state_sync_types.workspace = true
Expand All @@ -35,8 +37,6 @@ lazy_static.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_protobuf = { workspace = true, features = ["testing"] }
papyrus_storage = { workspace = true, features = ["testing"] }
papyrus_test_utils.workspace = true
rand_chacha.workspace = true
static_assertions.workspace = true
tokio = { workspace = true, features = ["test-util"] }

Expand Down
88 changes: 73 additions & 15 deletions crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::Duration;
use class::ClassStreamBuilder;
use futures::channel::mpsc::{Receiver, SendError, Sender};
use futures::stream::BoxStream;
use futures::Stream;
use futures::{SinkExt as _, Stream};
use header::HeaderStreamBuilder;
use papyrus_common::pending_classes::ApiContractClass;
use papyrus_config::converters::deserialize_milliseconds_to_duration;
Expand All @@ -37,11 +37,20 @@ use papyrus_protobuf::sync::{
TransactionQuery,
};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use papyrus_test_utils::{get_rng, GetTestInstance};
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockBody, BlockNumber};
use starknet_api::block::{
BlockBody,
BlockHash,
BlockHeader,
BlockHeaderWithoutHash,
BlockNumber,
BlockSignature,
};
use starknet_api::core::ClassHash;
use starknet_api::hash::StarkHash;
use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses, ThinStateDiff};
use starknet_api::transaction::FullTransaction;
use starknet_api::transaction::{FullTransaction, Transaction, TransactionOutput};
use starknet_state_sync_types::state_sync_types::SyncBlock;
use state_diff::StateDiffStreamBuilder;
use stream_builder::{DataStreamBuilder, DataStreamResult};
Expand Down Expand Up @@ -231,7 +240,6 @@ pub struct P2PSyncClient {
storage_reader: StorageReader,
storage_writer: StorageWriter,
p2p_sync_channels: P2PSyncClientChannels,
#[allow(dead_code)]
internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>,
}

Expand All @@ -247,26 +255,76 @@ impl P2PSyncClient {
}

#[instrument(skip(self), level = "debug", err)]
pub async fn run(mut self) -> Result<(), P2PSyncClientError> {
pub async fn run(self) -> Result<(), P2PSyncClientError> {
let internal_blocks_channels = InternalBlocksChannels::new();
self.create_internal_blocks_sender_task(internal_blocks_channels.senders);
let mut data_stream = self.p2p_sync_channels.create_stream(
self.storage_reader.clone(),
self.config,
let P2PSyncClient {
config,
storage_reader,
mut storage_writer,
p2p_sync_channels,
internal_blocks_receiver,
} = self;
Self::create_internal_blocks_sender_task(
internal_blocks_channels.senders,
internal_blocks_receiver,
);
let mut data_stream = p2p_sync_channels.create_stream(
storage_reader,
config,
internal_blocks_channels.receivers,
);

loop {
let data = data_stream.next().await.expect("Sync data stream should never end")?;
data.write_to_storage(&mut self.storage_writer)?;
data.write_to_storage(&mut storage_writer)?;
}
}

fn create_internal_blocks_sender_task(
&self,
#[allow(unused_variables)] internal_blocks_senders: InternalBlocksSenders,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {})
internal_blocks_senders: InternalBlocksSenders,
mut internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>,
) -> tokio::task::JoinHandle<Result<(), SendError>> {
tokio::spawn(async move {
loop {
let (block_number, sync_block) = StreamExt::next(&mut internal_blocks_receiver)
.await
.expect("Internal blocks stream should never end");
let InternalBlocksSenders {
header_sender,
state_diff_sender,
transaction_sender,
class_sender: _,
} = &mut internal_blocks_senders.clone();
let block_header = SignedBlockHeader {
block_header: BlockHeader {
block_hash: BlockHash(StarkHash::from(block_number.0)),
block_header_without_hash: BlockHeaderWithoutHash {
block_number,
..Default::default()
},
state_diff_length: Some(sync_block.state_diff.len()),
n_transactions: sync_block.transaction_hashes.len(),
..Default::default()
},
signatures: vec![BlockSignature::default()],
};

header_sender.send((block_number, block_header)).await?;
let state_diff = sync_block.state_diff;
state_diff_sender.send((block_number, (state_diff, block_number))).await?;
let num_transactions = sync_block.transaction_hashes.len();
let mut rng = get_rng();
let block_body = BlockBody {
transaction_hashes: sync_block.transaction_hashes,
transaction_outputs: vec![
TransactionOutput::get_test_instance(&mut rng);
num_transactions
],
transactions: vec![Transaction::get_test_instance(&mut rng); num_transactions],
};
transaction_sender.send((block_number, (block_body, block_number))).await?;
}
})
}
}

Expand All @@ -279,7 +337,7 @@ struct InternalBlocksReceivers {
Receiver<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>,
}

#[allow(dead_code)]
#[derive(Clone)]
struct InternalBlocksSenders {
header_sender: Sender<(BlockNumber, SignedBlockHeader)>,
state_diff_sender: Sender<(BlockNumber, (ThinStateDiff, BlockNumber))>,
Expand Down

0 comments on commit b4e3e7f

Please sign in to comment.