Skip to content

Commit

Permalink
feat(papyrus_p2p_sync): impl p2psyncclient internal block forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
eitanm-starkware committed Dec 23, 2024
1 parent c9b43db commit 172ced2
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 44 deletions.
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ papyrus_network.workspace = true
papyrus_proc_macros.workspace = true
papyrus_protobuf.workspace = true
papyrus_storage.workspace = true
papyrus_test_utils.workspace = true
rand.workspace = true
rand_chacha.workspace = true
serde.workspace = true
starknet_api.workspace = true
starknet_state_sync_types.workspace = true
Expand All @@ -35,8 +37,6 @@ lazy_static.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_protobuf = { workspace = true, features = ["testing"] }
papyrus_storage = { workspace = true, features = ["testing"] }
papyrus_test_utils.workspace = true
rand_chacha.workspace = true
static_assertions.workspace = true
tokio = { workspace = true, features = ["test-util"] }

Expand Down
9 changes: 9 additions & 0 deletions crates/papyrus_p2p_sync/src/client/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use starknet_api::core::ClassHash;
use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses};
use starknet_state_sync_types::state_sync_types::SyncBlock;

use super::stream_builder::{
BadPeerError,
Expand Down Expand Up @@ -129,4 +130,12 @@ impl DataStreamBuilder<(ApiContractClass, ClassHash)> for ClassStreamBuilder {
fn get_start_block_number(storage_reader: &StorageReader) -> Result<BlockNumber, StorageError> {
storage_reader.begin_ro_txn()?.get_class_marker()
}

// TODO(Eitan): Implement this function once we have a class manager component.
fn convert_sync_block_to_block_data(
_block_number: BlockNumber,
_sync_block: SyncBlock,
) -> Option<(DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber)> {
None
}
}
31 changes: 30 additions & 1 deletion crates/papyrus_p2p_sync/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@ use papyrus_network::network_manager::ClientResponsesManager;
use papyrus_protobuf::sync::{DataOrFin, SignedBlockHeader};
use papyrus_storage::header::{HeaderStorageReader, HeaderStorageWriter};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use starknet_api::block::{
BlockHash,
BlockHeader,
BlockHeaderWithoutHash,
BlockNumber,
BlockSignature,
};
use starknet_api::hash::StarkHash;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use tracing::debug;

use super::stream_builder::{
Expand Down Expand Up @@ -112,4 +120,25 @@ impl DataStreamBuilder<SignedBlockHeader> for HeaderStreamBuilder {
fn get_start_block_number(storage_reader: &StorageReader) -> Result<BlockNumber, StorageError> {
storage_reader.begin_ro_txn()?.get_header_marker()
}

// TODO(Eitan): Use block info once it's included in the sync block.
// TODO(Eitan): Fill this with real header once SyncBlock has it.
fn convert_sync_block_to_block_data(
block_number: BlockNumber,
sync_block: SyncBlock,
) -> Option<SignedBlockHeader> {
Some(SignedBlockHeader {
block_header: BlockHeader {
block_hash: BlockHash(StarkHash::from(block_number.0)),
block_header_without_hash: BlockHeaderWithoutHash {
block_number,
..Default::default()
},
state_diff_length: Some(sync_block.state_diff.len()),
n_transactions: sync_block.transaction_hashes.len(),
..Default::default()
},
signatures: vec![BlockSignature::default()],
})
}
}
99 changes: 62 additions & 37 deletions crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::Duration;
use class::ClassStreamBuilder;
use futures::channel::mpsc::{Receiver, SendError, Sender};
use futures::stream::BoxStream;
use futures::Stream;
use futures::{SinkExt as _, Stream};
use header::HeaderStreamBuilder;
use papyrus_common::pending_classes::ApiContractClass;
use papyrus_config::converters::deserialize_milliseconds_to_duration;
Expand All @@ -38,9 +38,8 @@ use papyrus_protobuf::sync::{
};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockBody, BlockNumber};
use starknet_api::block::BlockNumber;
use starknet_api::core::ClassHash;
use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses, ThinStateDiff};
use starknet_api::transaction::FullTransaction;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use state_diff::StateDiffStreamBuilder;
Expand Down Expand Up @@ -173,28 +172,28 @@ impl P2PSyncClientChannels {
self,
storage_reader: StorageReader,
config: P2PSyncClientConfig,
_internal_blocks_receivers: InternalBlocksReceivers,
internal_blocks_receivers: InternalBlocksReceivers,
) -> impl Stream<Item = DataStreamResult> + Send + 'static {
let header_stream = HeaderStreamBuilder::create_stream(
self.header_sender,
storage_reader.clone(),
None,
Some(internal_blocks_receivers.header_receiver),
config.wait_period_for_new_data,
config.num_headers_per_query,
);

let state_diff_stream = StateDiffStreamBuilder::create_stream(
self.state_diff_sender,
storage_reader.clone(),
None,
Some(internal_blocks_receivers.state_diff_receiver),
config.wait_period_for_new_data,
config.num_block_state_diffs_per_query,
);

let transaction_stream = TransactionStreamFactory::create_stream(
self.transaction_sender,
storage_reader.clone(),
None,
Some(internal_blocks_receivers.transaction_receiver),
config.wait_period_for_new_data,
config.num_block_transactions_per_query,
);
Expand All @@ -216,7 +215,6 @@ pub struct P2PSyncClient {
storage_reader: StorageReader,
storage_writer: StorageWriter,
p2p_sync_channels: P2PSyncClientChannels,
#[allow(dead_code)]
internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>,
}

Expand All @@ -232,50 +230,77 @@ impl P2PSyncClient {
}

#[instrument(skip(self), level = "debug", err)]
pub async fn run(mut self) -> Result<(), P2PSyncClientError> {
pub async fn run(self) -> Result<(), P2PSyncClientError> {
info!("Starting P2P sync client");

let internal_blocks_channels = InternalBlocksChannels::new();
self.create_internal_blocks_sender_task(internal_blocks_channels.senders);
let mut data_stream = self.p2p_sync_channels.create_stream(
self.storage_reader.clone(),
self.config,
internal_blocks_channels.receivers,
);
let InternalBlocksChannels {
receivers: internal_blocks_receivers,
senders: mut internal_blocks_senders,
} = InternalBlocksChannels::new();
let P2PSyncClient {
config,
storage_reader,
mut storage_writer,
p2p_sync_channels,
mut internal_blocks_receiver,
} = self;
let mut data_stream =
p2p_sync_channels.create_stream(storage_reader, config, internal_blocks_receivers);

loop {
tokio::select! {
maybe_internal_block = internal_blocks_receiver.next() => {
let (block_number, sync_block) = maybe_internal_block.expect("Internal blocks stream should never end");
internal_blocks_senders.send(block_number, sync_block).await?;
}
data = data_stream.next() => {
let data = data.expect("Sync data stream should never end")?;
data.write_to_storage(&mut storage_writer)?;
}
}
let data = data_stream.next().await.expect("Sync data stream should never end")?;
data.write_to_storage(&mut self.storage_writer)?;
data.write_to_storage(&mut storage_writer)?;
}
}

fn create_internal_blocks_sender_task(
&self,
#[allow(unused_variables)] internal_blocks_senders: InternalBlocksSenders,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {})
}
}

#[allow(dead_code)]
pub(crate) struct InternalBlocksReceivers {
header_receiver: Receiver<(BlockNumber, SignedBlockHeader)>,
state_diff_receiver: Receiver<(BlockNumber, (ThinStateDiff, BlockNumber))>,
transaction_receiver: Receiver<(BlockNumber, (BlockBody, BlockNumber))>,
header_receiver: Receiver<(BlockNumber, SyncBlock)>,
state_diff_receiver: Receiver<(BlockNumber, SyncBlock)>,
transaction_receiver: Receiver<(BlockNumber, SyncBlock)>,
#[allow(dead_code)]
class_receiver:
Receiver<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>,
class_receiver: Receiver<(BlockNumber, SyncBlock)>,
}

#[allow(dead_code)]
struct InternalBlocksSenders {
header_sender: Sender<(BlockNumber, SignedBlockHeader)>,
state_diff_sender: Sender<(BlockNumber, (ThinStateDiff, BlockNumber))>,
transaction_sender: Sender<(BlockNumber, (BlockBody, BlockNumber))>,
pub struct InternalBlocksSenders {
header_sender: Sender<(BlockNumber, SyncBlock)>,
state_diff_sender: Sender<(BlockNumber, SyncBlock)>,
transaction_sender: Sender<(BlockNumber, SyncBlock)>,
#[allow(dead_code)]
class_sender: Sender<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>,
class_sender: Sender<(BlockNumber, SyncBlock)>,
}
impl InternalBlocksSenders {
pub async fn send(
&mut self,
block_number: BlockNumber,
sync_block: SyncBlock,
) -> Result<(), SendError> {
let header_send = self.header_sender.send((block_number, sync_block.clone()));
let state_diff_send = self.state_diff_sender.send((block_number, sync_block.clone()));
let transaction_send = self.transaction_sender.send((block_number, sync_block.clone()));
let class_send = self.class_sender.send((block_number, sync_block));
let res =
futures::future::join4(header_send, state_diff_send, transaction_send, class_send)
.await;
match res {
(Ok(()), Ok(()), Ok(()), Ok(())) => Ok(()),
(Err(e), _, _, _) => Err(e),
(_, Err(e), _, _) => Err(e),
(_, _, Err(e), _) => Err(e),
(_, _, _, Err(e)) => Err(e),
}
}
}

struct InternalBlocksChannels {
receivers: InternalBlocksReceivers,
senders: InternalBlocksSenders,
Expand Down
8 changes: 8 additions & 0 deletions crates/papyrus_p2p_sync/src/client/state_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use papyrus_storage::state::{StateStorageReader, StateStorageWriter};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use starknet_api::state::ThinStateDiff;
use starknet_state_sync_types::state_sync_types::SyncBlock;

use super::stream_builder::BadPeerError;
use crate::client::stream_builder::{
Expand Down Expand Up @@ -110,6 +111,13 @@ impl DataStreamBuilder<StateDiffChunk> for StateDiffStreamBuilder {
fn get_start_block_number(storage_reader: &StorageReader) -> Result<BlockNumber, StorageError> {
storage_reader.begin_ro_txn()?.get_state_marker()
}

fn convert_sync_block_to_block_data(
block_number: BlockNumber,
sync_block: SyncBlock,
) -> Option<(ThinStateDiff, BlockNumber)> {
Some((sync_block.state_diff, block_number))
}
}

// For performance reasons, this function does not check if a deprecated class was declared twice.
Expand Down
21 changes: 18 additions & 3 deletions crates/papyrus_p2p_sync/src/client/stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use papyrus_storage::state::StateStorageReader;
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::{BlockNumber, BlockSignature};
use starknet_api::core::ClassHash;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use tracing::{debug, info, warn};

use super::{P2PSyncClientError, STEP};
Expand Down Expand Up @@ -54,21 +55,35 @@ where

fn get_start_block_number(storage_reader: &StorageReader) -> Result<BlockNumber, StorageError>;

// TODO(Eitan): Remove option on return once we have a class manager component.
// Returning None happens when internal blocks are disabled for this stream.
fn convert_sync_block_to_block_data(
block_number: BlockNumber,
sync_block: SyncBlock,
) -> Option<Self::Output>;

fn get_internal_block_at(
internal_blocks_received: &mut HashMap<BlockNumber, Self::Output>,
internal_block_receiver: &mut Option<Receiver<(BlockNumber, Self::Output)>>,
internal_block_receiver: &mut Option<Receiver<(BlockNumber, SyncBlock)>>,
current_block_number: BlockNumber,
) -> Option<Self::Output> {
if let Some(block) = internal_blocks_received.remove(&current_block_number) {
return Some(block);
}
let Some(internal_block_receiver) = internal_block_receiver else { return None };
while let Some((block_number, block_data)) = internal_block_receiver
while let Some((block_number, sync_block)) = internal_block_receiver
.next()
.now_or_never()
.map(|now_or_never_res| now_or_never_res.expect("Internal block receiver closed"))
{
if block_number >= current_block_number {
let block_data =
match Self::convert_sync_block_to_block_data(block_number, sync_block) {
Some(block_data) => block_data,
// If None is received then we don't use internal blocks for this stream
// TODO(Eitan): Remove this once we have a class manager component.
None => return None,
};
if block_number == current_block_number {
return Some(block_data);
}
Expand All @@ -81,7 +96,7 @@ where
fn create_stream<TQuery>(
mut sqmr_sender: SqmrClientSender<TQuery, DataOrFin<InputFromNetwork>>,
storage_reader: StorageReader,
mut internal_block_receiver: Option<Receiver<(BlockNumber, Self::Output)>>,
mut internal_block_receiver: Option<Receiver<(BlockNumber, SyncBlock)>>,
wait_period_for_new_data: Duration,
num_blocks_per_query: u64,
) -> BoxStream<'static, DataStreamResult>
Expand Down
25 changes: 24 additions & 1 deletion crates/papyrus_p2p_sync/src/client/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use papyrus_protobuf::sync::DataOrFin;
use papyrus_storage::body::{BodyStorageReader, BodyStorageWriter};
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::block::{BlockBody, BlockNumber};
use starknet_api::transaction::FullTransaction;
use starknet_api::transaction::{FullTransaction, Transaction, TransactionOutput};
use starknet_state_sync_types::state_sync_types::SyncBlock;

use super::stream_builder::{
BadPeerError,
Expand Down Expand Up @@ -84,4 +86,25 @@ impl DataStreamBuilder<FullTransaction> for TransactionStreamFactory {
fn get_start_block_number(storage_reader: &StorageReader) -> Result<BlockNumber, StorageError> {
storage_reader.begin_ro_txn()?.get_body_marker()
}

// TODO(Eitan): Fill this with real transactions once SyncBlock has it.
fn convert_sync_block_to_block_data(
block_number: BlockNumber,
sync_block: SyncBlock,
) -> Option<(BlockBody, BlockNumber)> {
let num_transactions = sync_block.transaction_hashes.len();
let mut rng = get_rng();
let block_body = BlockBody {
transaction_hashes: sync_block.transaction_hashes,
transaction_outputs: std::iter::repeat_with(|| {
TransactionOutput::get_test_instance(&mut rng)
})
.take(num_transactions)
.collect::<Vec<_>>(),
transactions: std::iter::repeat_with(|| Transaction::get_test_instance(&mut rng))
.take(num_transactions)
.collect::<Vec<_>>(),
};
Some((block_body, block_number))
}
}

0 comments on commit 172ced2

Please sign in to comment.