Skip to content

Commit

Permalink
Add data columns by root sync request (#6274)
Browse files Browse the repository at this point in the history
* Add data columns by root sync request
  • Loading branch information
dapplion authored Aug 21, 2024
1 parent 56d1c8c commit 677f96a
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 32 deletions.
6 changes: 6 additions & 0 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,9 @@ impl slog::Value for RequestId {
}
}
}

impl std::fmt::Display for DataColumnsByRootRequestId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
75 changes: 54 additions & 21 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ use beacon_chain::{
};
use futures::StreamExt;
use lighthouse_network::rpc::RPCError;
use lighthouse_network::service::api_types::{Id, SingleLookupReqId, SyncRequestId};
use lighthouse_network::service::api_types::{
DataColumnsByRootRequestId, Id, SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo;
use lighthouse_network::{PeerAction, PeerId};
Expand Down Expand Up @@ -345,9 +347,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::SingleBlob { id } => {
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::DataColumnsByRoot { .. } => {
// TODO(das)
}
SyncRequestId::DataColumnsByRoot(req_id, requester) => self
.on_data_columns_by_root_response(
req_id,
requester,
peer_id,
RpcEvent::RPCError(error),
),
SyncRequestId::RangeBlockAndBlobs { id } => {
if let Some(sender_id) = self.network.range_request_failed(id) {
match sender_id {
Expand Down Expand Up @@ -860,15 +866,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
None => RpcEvent::StreamTermination,
},
),
SyncRequestId::SingleBlob { .. } => {
crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id );
}
SyncRequestId::DataColumnsByRoot { .. } => {
// TODO(das)
}
SyncRequestId::RangeBlockAndBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, block.into())
}
_ => {
crit!(self.log, "bad request id for block"; "peer_id" => %peer_id );
}
}
}

Expand Down Expand Up @@ -897,9 +900,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match request_id {
SyncRequestId::SingleBlock { .. } => {
crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id );
}
SyncRequestId::SingleBlob { id } => self.on_single_blob_response(
id,
peer_id,
Expand All @@ -908,23 +908,41 @@ impl<T: BeaconChainTypes> SyncManager<T> {
None => RpcEvent::StreamTermination,
},
),
SyncRequestId::DataColumnsByRoot { .. } => {
// TODO(das)
}
SyncRequestId::RangeBlockAndBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, blob.into())
}
_ => {
crit!(self.log, "bad request id for blob"; "peer_id" => %peer_id);
}
}
}

fn rpc_data_column_received(
&mut self,
_request_id: SyncRequestId,
_peer_id: PeerId,
_data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
_seen_timestamp: Duration,
request_id: SyncRequestId,
peer_id: PeerId,
data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
) {
// TODO(das): implement handler
match request_id {
SyncRequestId::DataColumnsByRoot(req_id, requester) => {
self.on_data_columns_by_root_response(
req_id,
requester,
peer_id,
match data_column {
Some(data_column) => RpcEvent::Response(data_column, seen_timestamp),
None => RpcEvent::StreamTermination,
},
);
}
SyncRequestId::RangeBlockAndBlobs { id: _ } => {
// TODO(das): implement custody range sync
}
_ => {
crit!(self.log, "bad request id for data_column"; "peer_id" => %peer_id);
}
}
}

fn on_single_blob_response(
Expand All @@ -944,6 +962,21 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}

fn on_data_columns_by_root_response(
&mut self,
req_id: DataColumnsByRootRequestId,
_requester: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
) {
if let Some(_resp) = self
.network
.on_data_columns_by_root_response(req_id, peer_id, rpc_event)
{
// TODO(das): pass data_columns_by_root result to consumer
}
}

/// Handles receiving a response for a range sync request that should have both blocks and
/// blobs.
fn range_block_and_blobs_response(
Expand Down
124 changes: 115 additions & 9 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineStat
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::service::api_types::{AppRequestId, Id, SingleLookupReqId, SyncRequestId};
use lighthouse_network::service::api_types::{
AppRequestId, DataColumnsByRootRequestId, Id, SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
pub use requests::LookupVerifyError;
use requests::{ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest};
use slog::{debug, error, trace, warn};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock};
use types::{
BlobSidecar, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock,
};

mod requests;

Expand Down Expand Up @@ -96,10 +101,10 @@ impl From<LookupVerifyError> for RpcResponseError {
/// Sequential ID that uniquely identifies ReqResp outgoing requests
pub type ReqId = u32;

pub enum LookupRequestResult {
pub enum LookupRequestResult<I = ReqId> {
/// A request is sent. Sync MUST receive an event from the network in the future for either:
/// completed response or failed request
RequestSent(ReqId),
RequestSent(I),
/// No request is sent, and no further action is necessary to consider this request completed
NoRequestNeeded,
/// No request is sent, but the request is not completed. Sync MUST receive some future event
Expand All @@ -123,6 +128,10 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// A mapping of active BlobsByRoot requests, including both current slot and parent lookups.
blobs_by_root_requests: FnvHashMap<SingleLookupReqId, ActiveBlobsByRootRequest<T::EthSpec>>,

/// A mapping of active DataColumnsByRoot requests
data_columns_by_root_requests:
FnvHashMap<DataColumnsByRootRequestId, ActiveDataColumnsByRootRequest<T::EthSpec>>,

/// BlocksByRange requests paired with BlobsByRange
range_blocks_and_blobs_requests:
FnvHashMap<Id, (RangeRequestId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
Expand Down Expand Up @@ -171,6 +180,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: 1,
blocks_by_root_requests: <_>::default(),
blobs_by_root_requests: <_>::default(),
data_columns_by_root_requests: <_>::default(),
range_blocks_and_blobs_requests: FnvHashMap::default(),
network_beacon_processor,
chain,
Expand Down Expand Up @@ -211,10 +221,21 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
None
}
});
let failed_data_column_by_root_ids =
self.data_columns_by_root_requests
.iter()
.filter_map(|(req_id, request)| {
if request.peer_id == *peer_id {
Some(SyncRequestId::DataColumnsByRoot(*req_id, request.requester))
} else {
None
}
});

failed_range_ids
.chain(failed_block_ids)
.chain(failed_blob_ids)
.chain(failed_data_column_by_root_ids)
.collect()
}

Expand Down Expand Up @@ -529,6 +550,43 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(LookupRequestResult::RequestSent(req_id))
}

/// Request to send a single `data_columns_by_root` request to the network.
pub fn data_column_lookup_request(
&mut self,
requester: SingleLookupReqId,
peer_id: PeerId,
request: DataColumnsByRootSingleBlockRequest,
) -> Result<LookupRequestResult<DataColumnsByRootRequestId>, &'static str> {
let req_id = DataColumnsByRootRequestId(self.next_id());
debug!(
self.log,
"Sending DataColumnsByRoot Request";
"method" => "DataColumnsByRoot",
"block_root" => ?request.block_root,
"indices" => ?request.indices,
"peer" => %peer_id,
"requester" => ?requester,
"req_id" => %req_id,
);

self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)),
request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id, requester)),
})?;

self.data_columns_by_root_requests.insert(
req_id,
ActiveDataColumnsByRootRequest::new(request, peer_id, requester),
);

Ok(LookupRequestResult::RequestSent(req_id))
}

/// Request to fetch all needed custody columns of a specific block. This function may not send
/// any request to the network if no columns have to be fetched based on the import state of the
/// node. A custody request is a "super request" that may trigger 0 or more `data_columns_by_root`
/// requests.
pub fn custody_lookup_request(
&mut self,
lookup_id: SingleLookupId,
Expand Down Expand Up @@ -707,14 +765,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
request_id: SingleLookupReqId,
peer_id: PeerId,
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcResponseResult<Arc<SignedBeaconBlock<T::EthSpec>>>> {
let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blocks_by_root"]);
return None;
};

let resp = match block {
let resp = match rpc_event {
RpcEvent::Response(block, seen_timestamp) => {
match request.get_mut().add_response(block) {
Ok(block) => Ok((block, seen_timestamp)),
Expand Down Expand Up @@ -745,14 +803,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
request_id: SingleLookupReqId,
peer_id: PeerId,
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blobs_by_root"]);
return None;
};

let resp = match blob {
let resp = match rpc_event {
RpcEvent::Response(blob, seen_timestamp) => {
let request = request.get_mut();
match request.add_response(blob) {
Expand Down Expand Up @@ -791,6 +849,54 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}

#[allow(clippy::type_complexity)]
pub fn on_data_columns_by_root_response(
&mut self,
id: DataColumnsByRootRequestId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<DataColumnSidecar<T::EthSpec>>>>> {
let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else {
return None;
};

let resp = match rpc_event {
RpcEvent::Response(data_column, seen_timestamp) => {
let request = request.get_mut();
match request.add_response(data_column) {
Ok(Some(data_columns)) => Ok((data_columns, seen_timestamp)),
Ok(None) => return None,
Err(e) => Err((e.into(), request.resolve())),
}
}
RpcEvent::StreamTermination => match request.remove().terminate() {
Ok(_) => return None,
// (err, false = not resolved) because terminate returns Ok() if resolved
Err(e) => Err((e.into(), false)),
},
RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())),
};

match resp {
Ok(resp) => Some(Ok(resp)),
// Track if this request has already returned some value downstream. Ensure that
// downstream code only receives a single Result per request. If the serving peer does
// multiple penalizable actions per request, downscore and return None. This allows to
// catch if a peer is returning more columns than requested or if the excess blobs are
// invalid.
Err((e, resolved)) => {
if let RpcResponseError::VerifyError(e) = &e {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
if resolved {
None
} else {
Some(Err(e))
}
}
}
}

pub fn send_block_for_processing(
&self,
id: Id,
Expand Down Expand Up @@ -900,7 +1006,7 @@ fn to_fixed_blob_sidecar_list<E: EthSpec>(
let index = blob.index as usize;
*fixed_list
.get_mut(index)
.ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob)
.ok_or(LookupVerifyError::UnrequestedIndex(index as u64))? = Some(blob)
}
Ok(fixed_list)
}
10 changes: 8 additions & 2 deletions beacon_node/network/src/sync/network_context/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@ use types::{
blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock,
};

pub use data_columns_by_root::{
ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest,
};

mod data_columns_by_root;

#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupVerifyError {
NoResponseReturned,
NotEnoughResponsesReturned { expected: usize, actual: usize },
TooManyResponses,
UnrequestedBlockRoot(Hash256),
UnrequestedBlobIndex(u64),
UnrequestedIndex(u64),
InvalidInclusionProof,
DuplicateData,
}
Expand Down Expand Up @@ -131,7 +137,7 @@ impl<E: EthSpec> ActiveBlobsByRootRequest<E> {
return Err(LookupVerifyError::InvalidInclusionProof);
}
if !self.request.indices.contains(&blob.index) {
return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index));
return Err(LookupVerifyError::UnrequestedIndex(blob.index));
}
if self.blobs.iter().any(|b| b.index == blob.index) {
return Err(LookupVerifyError::DuplicateData);
Expand Down
Loading

0 comments on commit 677f96a

Please sign in to comment.