Skip to content

Commit

Permalink
feat(sync): implement DataStreamBuilder for classes
Browse files Browse the repository at this point in the history
  • Loading branch information
noamsp-starkware committed Nov 11, 2024
1 parent 1b9e37f commit 248a016
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 28 deletions.
17 changes: 11 additions & 6 deletions config/papyrus/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -319,21 +319,26 @@
"privacy": "Public",
"value": 100000
},
"p2p_sync.num_block_classes_per_query": {
"description": "The maximum amount of block's classes to ask from peers in each iteration.",
"privacy": "Public",
"value": 100
},
"p2p_sync.num_block_state_diffs_per_query": {
"description": "The maximum amount of block's state diffs to ask from peers in each iteration.",
"privacy": "Public",
"value": 100
},
"p2p_sync.num_block_transactions_per_query": {
"description": "The maximum amount of blocks to ask their transactions from peers in each iteration.",
"privacy": "Public",
"value": 100
},
"p2p_sync.num_headers_per_query": {
"description": "The maximum amount of headers to ask from peers in each iteration.",
"privacy": "Public",
"value": 10000
},
"p2p_sync.num_transactions_per_query": {
"description": "The maximum amount of blocks to ask their transactions from peers in each iteration.",
"privacy": "Public",
"value": 100
},
"p2p_sync.stop_sync_at_block_number": {
"description": "Stops the sync at given block number and closes the node cleanly. Used to run profiling on the node.",
"privacy": "Public",
Expand Down Expand Up @@ -504,4 +509,4 @@
"privacy": "Public",
"value": true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,27 +383,34 @@ expression: dumped_default_config
},
"privacy": "Public"
},
"p2p_sync.num_block_state_diffs_per_query": {
"description": "The maximum amount of block's state diffs to ask from peers in each iteration.",
"p2p_sync.num_block_classes_per_query": {
"description": "The maximum amount of block's classes to ask from peers in each iteration.",
"value": {
"$serde_json::private::Number": "100"
},
"privacy": "Public"
},
"p2p_sync.num_headers_per_query": {
"description": "The maximum amount of headers to ask from peers in each iteration.",
"p2p_sync.num_block_state_diffs_per_query": {
"description": "The maximum amount of block's state diffs to ask from peers in each iteration.",
"value": {
"$serde_json::private::Number": "10000"
"$serde_json::private::Number": "100"
},
"privacy": "Public"
},
"p2p_sync.num_transactions_per_query": {
"p2p_sync.num_block_transactions_per_query": {
"description": "The maximum amount of blocks to ask their transactions from peers in each iteration.",
"value": {
"$serde_json::private::Number": "100"
},
"privacy": "Public"
},
"p2p_sync.num_headers_per_query": {
"description": "The maximum amount of headers to ask from peers in each iteration.",
"value": {
"$serde_json::private::Number": "10000"
},
"privacy": "Public"
},
"p2p_sync.stop_sync_at_block_number": {
"description": "Stops the sync at given block number and closes the node cleanly. Used to run profiling on the node.",
"value": {
Expand Down
132 changes: 132 additions & 0 deletions crates/papyrus_p2p_sync/src/client/class.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::collections::HashSet;

use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use papyrus_common::pending_classes::ApiContractClass;
use papyrus_network::network_manager::ClientResponsesManager;
use papyrus_protobuf::sync::DataOrFin;
use papyrus_storage::class::{ClassStorageReader, ClassStorageWriter};
use papyrus_storage::state::StateStorageReader;
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use starknet_api::core::ClassHash;
use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses};

use super::stream_builder::{
BadPeerError,
BlockData,
BlockNumberLimit,
DataStreamBuilder,
ParseDataError,
};
use super::{P2PSyncClientError, NETWORK_DATA_TIMEOUT};

impl BlockData for (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber) {
fn write_to_storage(
self: Box<Self>,
storage_writer: &mut StorageWriter,
) -> Result<(), StorageError> {
storage_writer
.begin_rw_txn()?
.append_classes(
self.2,
&self.0.iter().map(|(class_hash, class)| (*class_hash, class)).collect::<Vec<_>>(),
&self
.1
.iter()
.map(|(class_hash, deprecated_class)| (*class_hash, deprecated_class))
.collect::<Vec<_>>(),
)?
.commit()
}
}

pub(crate) struct ClassStreamBuilder;

impl DataStreamBuilder<(ApiContractClass, ClassHash)> for ClassStreamBuilder {
type Output = (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber);

const TYPE_DESCRIPTION: &'static str = "classes";
const BLOCK_NUMBER_LIMIT: BlockNumberLimit = BlockNumberLimit::StateDiffMarker;

fn parse_data_for_block<'a>(
classes_response_manager: &'a mut ClientResponsesManager<
DataOrFin<(ApiContractClass, ClassHash)>,
>,
block_number: BlockNumber,
storage_reader: &'a StorageReader,
) -> BoxFuture<'a, Result<Option<Self::Output>, ParseDataError>> {
async move {
let (target_class_len, declared_classes, deprecated_declared_classes) = {
let state_diff = storage_reader
.begin_ro_txn()?
.get_state_diff(block_number)?
.expect("A state diff with number lower than the state diff marker is missing");
(
state_diff.declared_classes.len()
+ state_diff.deprecated_declared_classes.len(),
state_diff.declared_classes,
state_diff.deprecated_declared_classes.iter().cloned().collect::<HashSet<_>>(),
)
};
let (
mut current_class_len,
mut declared_classes_result,
mut deprecated_declared_classes_result,
) = (0, DeclaredClasses::new(), DeprecatedDeclaredClasses::new());

while current_class_len < target_class_len {
let maybe_contract_class =
tokio::time::timeout(NETWORK_DATA_TIMEOUT, classes_response_manager.next())
.await?
.ok_or(P2PSyncClientError::ReceiverChannelTerminated {
type_description: Self::TYPE_DESCRIPTION,
})?;
let Some((api_contract_class, class_hash)) = maybe_contract_class?.0 else {
if current_class_len == 0 {
return Ok(None);
} else {
return Err(ParseDataError::BadPeer(BadPeerError::NotEnoughClasses {
expected: target_class_len,
actual: current_class_len,
block_number: block_number.0,
}));
}
};

let (is_declared, duplicate_class) = match api_contract_class {
ApiContractClass::ContractClass(contract_class) => (
declared_classes.get(&class_hash).is_some(),
declared_classes_result.insert(class_hash, contract_class).is_some(),
),
ApiContractClass::DeprecatedContractClass(deprecated_contract_class) => (
deprecated_declared_classes.contains(&class_hash),
deprecated_declared_classes_result
.insert(class_hash, deprecated_contract_class)
.is_some(),
),
};

if !is_declared {
return Err(ParseDataError::BadPeer(BadPeerError::ClassNotInStateDiff {
class_hash,
}));
}

if duplicate_class {
return Err(ParseDataError::BadPeer(BadPeerError::DuplicateClass {
class_hash,
}));
}

current_class_len += 1;
}
Ok(Some((declared_classes_result, deprecated_declared_classes_result, block_number)))
}
.boxed()
}

fn get_start_block_number(storage_reader: &StorageReader) -> Result<BlockNumber, StorageError> {
storage_reader.begin_ro_txn()?.get_class_marker()
}
}
30 changes: 24 additions & 6 deletions crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod class;
mod header;
#[cfg(test)]
mod header_test;
Expand All @@ -12,6 +13,7 @@ mod transaction;
use std::collections::BTreeMap;
use std::time::Duration;

use class::ClassStreamBuilder;
use futures::channel::mpsc::SendError;
use futures::Stream;
use header::HeaderStreamBuilder;
Expand Down Expand Up @@ -48,7 +50,8 @@ const NETWORK_DATA_TIMEOUT: Duration = Duration::from_secs(300);
pub struct P2PSyncClientConfig {
pub num_headers_per_query: u64,
pub num_block_state_diffs_per_query: u64,
pub num_transactions_per_query: u64,
pub num_block_transactions_per_query: u64,
pub num_block_classes_per_query: u64,
#[serde(deserialize_with = "deserialize_seconds_to_duration")]
pub wait_period_for_new_data: Duration,
pub buffer_size: usize,
Expand All @@ -71,12 +74,18 @@ impl SerializeConfig for P2PSyncClientConfig {
ParamPrivacyInput::Public,
),
ser_param(
"num_transactions_per_query",
&self.num_transactions_per_query,
"num_block_transactions_per_query",
&self.num_block_transactions_per_query,
"The maximum amount of blocks to ask their transactions from peers in each \
iteration.",
ParamPrivacyInput::Public,
),
ser_param(
"num_block_classes_per_query",
&self.num_block_classes_per_query,
"The maximum amount of block's classes to ask from peers in each iteration.",
ParamPrivacyInput::Public,
),
ser_param(
"wait_period_for_new_data",
&self.wait_period_for_new_data.as_secs(),
Expand Down Expand Up @@ -110,7 +119,8 @@ impl Default for P2PSyncClientConfig {
// State diffs are split into multiple messages, so big queries can lead to a lot of
// messages in the network buffers.
num_block_state_diffs_per_query: 100,
num_transactions_per_query: 100,
num_block_transactions_per_query: 100,
num_block_classes_per_query: 100,
wait_period_for_new_data: Duration::from_secs(5),
// TODO(eitan): split this by protocol
buffer_size: 100000,
Expand Down Expand Up @@ -187,11 +197,19 @@ impl P2PSyncClientChannels {
self.transaction_sender,
storage_reader.clone(),
config.wait_period_for_new_data,
config.num_transactions_per_query,
config.num_block_transactions_per_query,
config.stop_sync_at_block_number,
);

let class_stream = ClassStreamBuilder::create_stream(
self.class_sender,
storage_reader.clone(),
config.wait_period_for_new_data,
config.num_block_classes_per_query,
config.stop_sync_at_block_number,
);

header_stream.merge(state_diff_stream).merge(transaction_stream)
header_stream.merge(state_diff_stream).merge(transaction_stream).merge(class_stream)
}
}

Expand Down
30 changes: 21 additions & 9 deletions crates/papyrus_p2p_sync/src/client/stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use papyrus_network::network_manager::{ClientResponsesManager, SqmrClientSender}
use papyrus_protobuf::converters::ProtobufConversionError;
use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query};
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::state::StateStorageReader;
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::{BlockNumber, BlockSignature};
use starknet_api::core::ClassHash;
use tracing::{debug, info, warn};

use super::{P2PSyncClientError, STEP};
Expand All @@ -28,7 +30,7 @@ pub(crate) trait BlockData: Send {
pub(crate) enum BlockNumberLimit {
Unlimited,
HeaderMarker,
// TODO(shahak): Add variant for state diff marker once we support classes sync.
StateDiffMarker,
}

pub(crate) trait DataStreamBuilder<InputFromNetwork>
Expand Down Expand Up @@ -66,19 +68,20 @@ where
'send_query_and_parse_responses: loop {
let limit = match Self::BLOCK_NUMBER_LIMIT {
BlockNumberLimit::Unlimited => num_blocks_per_query,
BlockNumberLimit::HeaderMarker => {
let last_block_number = storage_reader.begin_ro_txn()?.get_header_marker()?;
let limit = min(
last_block_number.0 - current_block_number.0,
num_blocks_per_query,
);
BlockNumberLimit::HeaderMarker | BlockNumberLimit::StateDiffMarker => {
let (last_block_number, description) = match Self::BLOCK_NUMBER_LIMIT {
BlockNumberLimit::HeaderMarker => (storage_reader.begin_ro_txn()?.get_header_marker()?, "header"),
BlockNumberLimit::StateDiffMarker => (storage_reader.begin_ro_txn()?.get_state_marker()?, "state diff"),
_ => unreachable!(),
};
let limit = min(last_block_number.0 - current_block_number.0, num_blocks_per_query);
if limit == 0 {
debug!("{:?} sync is waiting for a new header", Self::TYPE_DESCRIPTION);
debug!("{:?} sync is waiting for a new {}", Self::TYPE_DESCRIPTION, description);
tokio::time::sleep(wait_period_for_new_data).await;
continue;
}
limit
}
},
};
let end_block_number = current_block_number.0 + limit;
debug!(
Expand Down Expand Up @@ -180,6 +183,15 @@ pub(crate) enum BadPeerError {
EmptyStateDiffPart,
#[error(transparent)]
ProtobufConversionError(#[from] ProtobufConversionError),
#[error(
"Expected to receive {expected} classes for {block_number} from the network. Got {actual} \
classes instead"
)]
NotEnoughClasses { expected: usize, actual: usize, block_number: u64 },
#[error("The class with hash {class_hash} was not found in the state diff.")]
ClassNotInStateDiff { class_hash: ClassHash },
#[error("Received two classes with the same hash: {class_hash}.")]
DuplicateClass { class_hash: ClassHash },
}

#[derive(thiserror::Error, Debug)]
Expand Down
4 changes: 3 additions & 1 deletion crates/papyrus_p2p_sync/src/client/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientConfig};
pub const BUFFER_SIZE: usize = 1000;
pub const HEADER_QUERY_LENGTH: u64 = 5;
pub const STATE_DIFF_QUERY_LENGTH: u64 = 3;
pub const CLASS_DIFF_QUERY_LENGTH: u64 = 3;
pub const TRANSACTION_QUERY_LENGTH: u64 = 3;
pub const SLEEP_DURATION_TO_LET_SYNC_ADVANCE: Duration = Duration::from_millis(10);
pub const WAIT_PERIOD_FOR_NEW_DATA: Duration = Duration::from_secs(1);
Expand All @@ -40,7 +41,8 @@ lazy_static! {
static ref TEST_CONFIG: P2PSyncClientConfig = P2PSyncClientConfig {
num_headers_per_query: HEADER_QUERY_LENGTH,
num_block_state_diffs_per_query: STATE_DIFF_QUERY_LENGTH,
num_transactions_per_query: TRANSACTION_QUERY_LENGTH,
num_block_transactions_per_query: TRANSACTION_QUERY_LENGTH,
num_block_classes_per_query: CLASS_DIFF_QUERY_LENGTH,
wait_period_for_new_data: WAIT_PERIOD_FOR_NEW_DATA,
buffer_size: BUFFER_SIZE,
stop_sync_at_block_number: None,
Expand Down

0 comments on commit 248a016

Please sign in to comment.