Skip to content

Commit

Permalink
Add stub for DataColumnsByRoot
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Feb 6, 2024
1 parent 4e7c0d1 commit b5242ea
Show file tree
Hide file tree
Showing 16 changed files with 320 additions and 14 deletions.
17 changes: 16 additions & 1 deletion beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024;
/// will be stored before we start dropping them.
const MAX_BLOBS_BY_ROOTS_QUEUE_LEN: usize = 1_024;

/// The maximum number of queued `DataColumnsByRootRequest` objects received from the network RPC that
/// will be stored before we start dropping them.
const MAX_DATA_COLUMNS_BY_ROOTS_QUEUE_LEN: usize = 2_048;

/// Maximum number of `SignedBlsToExecutionChange` messages to queue before dropping them.
///
/// This value is set high to accommodate the large spike that is expected immediately after Capella
Expand Down Expand Up @@ -247,6 +251,7 @@ pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request";
pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request";
pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request";
pub const DATA_COLUMNS_BY_ROOTS_REQUEST: &str = "data_columns_by_roots_request";
pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
Expand Down Expand Up @@ -624,6 +629,7 @@ pub enum Work<E: EthSpec> {
BlocksByRootsRequest(BlockingFnWithManualSendOnIdle),
BlobsByRangeRequest(BlockingFn),
BlobsByRootsRequest(BlockingFn),
DataColumnsByRootsRequest(BlockingFn),
GossipBlsToExecutionChange(BlockingFn),
LightClientBootstrapRequest(BlockingFn),
ApiRequestP0(BlockingOrAsync),
Expand Down Expand Up @@ -665,6 +671,7 @@ impl<E: EthSpec> Work<E> {
Work::BlocksByRootsRequest(_) => BLOCKS_BY_ROOTS_REQUEST,
Work::BlobsByRangeRequest(_) => BLOBS_BY_RANGE_REQUEST,
Work::BlobsByRootsRequest(_) => BLOBS_BY_ROOTS_REQUEST,
Work::DataColumnsByRootsRequest(_) => DATA_COLUMNS_BY_ROOTS_REQUEST,
Work::LightClientBootstrapRequest(_) => LIGHT_CLIENT_BOOTSTRAP_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Expand Down Expand Up @@ -824,6 +831,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN);
let mut blbroots_queue = FifoQueue::new(MAX_BLOBS_BY_ROOTS_QUEUE_LEN);
let mut blbrange_queue = FifoQueue::new(MAX_BLOBS_BY_RANGE_QUEUE_LEN);
let mut dcbroots_queue = FifoQueue::new(MAX_DATA_COLUMNS_BY_ROOTS_QUEUE_LEN);

let mut gossip_bls_to_execution_change_queue =
FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN);
Expand Down Expand Up @@ -1123,6 +1131,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = blbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = dcbroots_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check slashings after all other consensus messages so we prioritize
// following head.
//
Expand Down Expand Up @@ -1276,6 +1286,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::BlobsByRootsRequest { .. } => {
blbroots_queue.push(work, work_id, &self.log)
}
Work::DataColumnsByRootsRequest { .. } => {
dcbroots_queue.push(work, work_id, &self.log)
}
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1477,7 +1490,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
| Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move {
work.await;
}),
Work::BlobsByRangeRequest(process_fn) | Work::BlobsByRootsRequest(process_fn) => {
Work::BlobsByRangeRequest(process_fn)
| Work::BlobsByRootsRequest(process_fn)
| Work::DataColumnsByRootsRequest(process_fn) => {
task_spawner.spawn_blocking(process_fn)
}
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
Expand Down
10 changes: 8 additions & 2 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,10 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError,
RPCResponseErrorCode::ResourceUnavailable => {
// Don't ban on this because we want to retry with a block by root request.
if matches!(protocol, Protocol::BlobsByRoot) {
if matches!(
protocol,
Protocol::BlobsByRoot | Protocol::DataColumnsByRoot
) {
return;
}

Expand Down Expand Up @@ -559,8 +562,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
Protocol::BlocksByRange => PeerAction::MidToleranceError,
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Protocol::LightClientBootstrap => PeerAction::LowToleranceError,
Protocol::BlobsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRoot => PeerAction::MidToleranceError,
Protocol::LightClientBootstrap => PeerAction::LowToleranceError,
Protocol::Goodbye => PeerAction::LowToleranceError,
Protocol::MetaData => PeerAction::LowToleranceError,
Protocol::Status => PeerAction::LowToleranceError,
Expand All @@ -579,6 +583,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
Protocol::BlocksByRoot => return,
Protocol::BlobsByRange => return,
Protocol::BlobsByRoot => return,
Protocol::DataColumnsByRoot => return,
Protocol::Goodbye => return,
Protocol::LightClientBootstrap => return,
Protocol::MetaData => PeerAction::Fatal,
Expand All @@ -597,6 +602,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Protocol::BlobsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRoot => PeerAction::MidToleranceError,
Protocol::LightClientBootstrap => return,
Protocol::Goodbye => return,
Protocol::MetaData => return,
Expand Down
32 changes: 31 additions & 1 deletion beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use std::io::{Read, Write};
use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use types::ChainSpec;
use types::{
BlobSidecar, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap,
RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase,
SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockMerge,
};
use types::{ChainSpec, DataColumnSidecar};
use unsigned_varint::codec::Uvi;

const CONTEXT_BYTES_LEN: usize = 4;
Expand Down Expand Up @@ -74,6 +74,7 @@ impl<TSpec: EthSpec> Encoder<RPCCodedResponse<TSpec>> for SSZSnappyInboundCodec<
RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(),
RPCResponse::BlobsByRoot(res) => res.as_ssz_bytes(),
RPCResponse::DataColumnsByRoot(res) => res.as_ssz_bytes(),
RPCResponse::LightClientBootstrap(res) => res.as_ssz_bytes(),
RPCResponse::Pong(res) => res.data.as_ssz_bytes(),
RPCResponse::MetaData(res) =>
Expand Down Expand Up @@ -230,6 +231,7 @@ impl<TSpec: EthSpec> Encoder<OutboundRequest<TSpec>> for SSZSnappyOutboundCodec<
},
OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(),
OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(),
OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(),
OutboundRequest::Ping(req) => req.as_ssz_bytes(),
OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode
};
Expand Down Expand Up @@ -498,6 +500,14 @@ fn handle_rpc_request<T: EthSpec>(
)?,
})))
}
SupportedProtocol::DataColumnsByRootV1 => Ok(Some(InboundRequest::DataColumnsByRoot(
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_data_column_sidecars as usize,
)?,
},
))),
SupportedProtocol::PingV1 => Ok(Some(InboundRequest::Ping(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
Expand Down Expand Up @@ -584,6 +594,23 @@ fn handle_rpc_response<T: EthSpec>(
),
)),
},
SupportedProtocol::DataColumnsByRootV1 => match fork_name {
// TODO(das): update fork name
Some(ForkName::Deneb) => Ok(Some(RPCResponse::DataColumnsByRoot(Arc::new(
DataColumnSidecar::from_ssz_bytes(decoded_buffer)?,
)))),
Some(_) => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
"Invalid fork name for data columns by root".to_string(),
)),
None => Err(RPCError::ErrorResponse(
RPCResponseErrorCode::InvalidRequest,
format!(
"No context bytes provided for {:?} response",
versioned_protocol
),
)),
},
SupportedProtocol::PingV1 => Ok(Some(RPCResponse::Pong(Ping {
data: u64::from_ssz_bytes(decoded_buffer)?,
}))),
Expand Down Expand Up @@ -945,6 +972,9 @@ mod tests {
OutboundRequest::BlobsByRoot(bbroot) => {
assert_eq!(decoded, InboundRequest::BlobsByRoot(bbroot))
}
OutboundRequest::DataColumnsByRoot(dcbroot) => {
assert_eq!(decoded, InboundRequest::DataColumnsByRoot(dcbroot))
}
OutboundRequest::Ping(ping) => {
assert_eq!(decoded, InboundRequest::Ping(ping))
}
Expand Down
14 changes: 14 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub struct RateLimiterConfig {
pub(super) blocks_by_root_quota: Quota,
pub(super) blobs_by_range_quota: Quota,
pub(super) blobs_by_root_quota: Quota,
pub(super) data_columns_by_root_quota: Quota,
pub(super) light_client_bootstrap_quota: Quota,
}

Expand All @@ -103,6 +104,8 @@ impl RateLimiterConfig {
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(768, 10);
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
// TODO(das): review quota
pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(256, 10);
pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10);
}

Expand All @@ -117,6 +120,7 @@ impl Default for RateLimiterConfig {
blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA,
blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA,
blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA,
data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA,
light_client_bootstrap_quota: Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA,
}
}
Expand All @@ -143,6 +147,10 @@ impl Debug for RateLimiterConfig {
.field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota))
.field("blobs_by_range", fmt_q!(&self.blobs_by_range_quota))
.field("blobs_by_root", fmt_q!(&self.blobs_by_root_quota))
.field(
"data_columns_by_root",
fmt_q!(&self.data_columns_by_root_quota),
)
.finish()
}
}
Expand All @@ -163,6 +171,7 @@ impl FromStr for RateLimiterConfig {
let mut blocks_by_root_quota = None;
let mut blobs_by_range_quota = None;
let mut blobs_by_root_quota = None;
let mut data_columns_by_root_quota = None;
let mut light_client_bootstrap_quota = None;

for proto_def in s.split(';') {
Expand All @@ -175,6 +184,9 @@ impl FromStr for RateLimiterConfig {
Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota),
Protocol::BlobsByRange => blobs_by_range_quota = blobs_by_range_quota.or(quota),
Protocol::BlobsByRoot => blobs_by_root_quota = blobs_by_root_quota.or(quota),
Protocol::DataColumnsByRoot => {
data_columns_by_root_quota = data_columns_by_root_quota.or(quota)
}
Protocol::Ping => ping_quota = ping_quota.or(quota),
Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota),
Protocol::LightClientBootstrap => {
Expand All @@ -194,6 +206,8 @@ impl FromStr for RateLimiterConfig {
blobs_by_range_quota: blobs_by_range_quota
.unwrap_or(Self::DEFAULT_BLOBS_BY_RANGE_QUOTA),
blobs_by_root_quota: blobs_by_root_quota.unwrap_or(Self::DEFAULT_BLOBS_BY_ROOT_QUOTA),
data_columns_by_root_quota: data_columns_by_root_quota
.unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA),
light_client_bootstrap_quota: light_client_bootstrap_quota
.unwrap_or(Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA),
})
Expand Down
33 changes: 31 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use std::sync::Arc;
use strum::IntoStaticStr;
use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::data_column_sidecar::DataColumnIdentifier;
use types::{
blob_sidecar::BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, LightClientBootstrap,
RuntimeVariableList, SignedBeaconBlock, Slot,
blob_sidecar::BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256,
LightClientBootstrap, RuntimeVariableList, SignedBeaconBlock, Slot,
};

/// Maximum length of error message.
Expand Down Expand Up @@ -366,6 +367,13 @@ impl BlobsByRootRequest {
}
}

/// Request a number of data columns from a peer.
#[derive(Clone, Debug, PartialEq)]
pub struct DataColumnsByRootRequest {
/// The list of beacon block roots and column indices being requested.
pub data_column_ids: RuntimeVariableList<DataColumnIdentifier>,
}

/* RPC Handling and Grouping */
// Collection of enums and structs used by the Codecs to encode/decode RPC messages

Expand All @@ -390,6 +398,9 @@ pub enum RPCResponse<T: EthSpec> {
/// A response to a get BLOBS_BY_ROOT request.
BlobsByRoot(Arc<BlobSidecar<T>>),

/// A response to a get DATA_COLUMN_SIDECARS_BY_ROOT request.
DataColumnsByRoot(Arc<DataColumnSidecar<T>>),

/// A PONG response to a PING request.
Pong(Ping),

Expand All @@ -411,6 +422,9 @@ pub enum ResponseTermination {

/// Blobs by root stream termination.
BlobsByRoot,

/// Data column sidecars by root stream termination.
DataColumnsByRoot,
}

/// The structured response containing a result/code indicating success or failure
Expand Down Expand Up @@ -482,6 +496,7 @@ impl<T: EthSpec> RPCCodedResponse<T> {
RPCResponse::BlocksByRoot(_) => true,
RPCResponse::BlobsByRange(_) => true,
RPCResponse::BlobsByRoot(_) => true,
RPCResponse::DataColumnsByRoot(_) => true,
RPCResponse::Pong(_) => false,
RPCResponse::MetaData(_) => false,
RPCResponse::LightClientBootstrap(_) => false,
Expand Down Expand Up @@ -520,6 +535,7 @@ impl<T: EthSpec> RPCResponse<T> {
RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot,
RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange,
RPCResponse::BlobsByRoot(_) => Protocol::BlobsByRoot,
RPCResponse::DataColumnsByRoot(_) => Protocol::DataColumnsByRoot,
RPCResponse::Pong(_) => Protocol::Ping,
RPCResponse::MetaData(_) => Protocol::MetaData,
RPCResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap,
Expand Down Expand Up @@ -563,6 +579,9 @@ impl<T: EthSpec> std::fmt::Display for RPCResponse<T> {
RPCResponse::BlobsByRoot(sidecar) => {
write!(f, "BlobsByRoot: Blob slot: {}", sidecar.slot())
}
RPCResponse::DataColumnsByRoot(sidecar) => {
write!(f, "DataColumnsByRoot: Data column slot: {}", sidecar.slot())
}
RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data),
RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()),
RPCResponse::LightClientBootstrap(bootstrap) => {
Expand Down Expand Up @@ -645,6 +664,16 @@ impl std::fmt::Display for BlobsByRangeRequest {
}
}

impl std::fmt::Display for DataColumnsByRootRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Request: DataColumnsByRoot: Number of Requested Data Column Ids: {}",
self.data_column_ids.len()
)
}
}

impl slog::KV for StatusMessage {
fn serialize(
&self,
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ where
ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot,
ResponseTermination::BlobsByRange => Protocol::BlobsByRange,
ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot,
ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot,
},
),
};
Expand Down
Loading

0 comments on commit b5242ea

Please sign in to comment.