Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data columns by root sync request #6274

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,9 @@ impl slog::Value for RequestId {
}
}
}

impl std::fmt::Display for DataColumnsByRootRequestId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
75 changes: 54 additions & 21 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ use beacon_chain::{
};
use futures::StreamExt;
use lighthouse_network::rpc::RPCError;
use lighthouse_network::service::api_types::{Id, SingleLookupReqId, SyncRequestId};
use lighthouse_network::service::api_types::{
DataColumnsByRootRequestId, Id, SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::types::{NetworkGlobals, SyncState};
use lighthouse_network::SyncInfo;
use lighthouse_network::{PeerAction, PeerId};
Expand Down Expand Up @@ -345,9 +347,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::SingleBlob { id } => {
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::DataColumnsByRoot { .. } => {
// TODO(das)
}
SyncRequestId::DataColumnsByRoot(req_id, requester) => self
.on_data_columns_by_root_response(
req_id,
requester,
peer_id,
RpcEvent::RPCError(error),
),
SyncRequestId::RangeBlockAndBlobs { id } => {
if let Some(sender_id) = self.network.range_request_failed(id) {
match sender_id {
Expand Down Expand Up @@ -860,15 +866,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
None => RpcEvent::StreamTermination,
},
),
SyncRequestId::SingleBlob { .. } => {
crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id );
}
SyncRequestId::DataColumnsByRoot { .. } => {
// TODO(das)
}
SyncRequestId::RangeBlockAndBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, block.into())
}
_ => {
crit!(self.log, "bad request id for block"; "peer_id" => %peer_id );
}
}
}

Expand Down Expand Up @@ -897,9 +900,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match request_id {
SyncRequestId::SingleBlock { .. } => {
crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id );
}
SyncRequestId::SingleBlob { id } => self.on_single_blob_response(
id,
peer_id,
Expand All @@ -908,23 +908,41 @@ impl<T: BeaconChainTypes> SyncManager<T> {
None => RpcEvent::StreamTermination,
},
),
SyncRequestId::DataColumnsByRoot { .. } => {
// TODO(das)
}
SyncRequestId::RangeBlockAndBlobs { id } => {
self.range_block_and_blobs_response(id, peer_id, blob.into())
}
_ => {
crit!(self.log, "bad request id for blob"; "peer_id" => %peer_id);
}
}
}

fn rpc_data_column_received(
&mut self,
_request_id: SyncRequestId,
_peer_id: PeerId,
_data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
_seen_timestamp: Duration,
request_id: SyncRequestId,
peer_id: PeerId,
data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
) {
// TODO(das): implement handler
match request_id {
SyncRequestId::DataColumnsByRoot(req_id, requester) => {
self.on_data_columns_by_root_response(
req_id,
requester,
peer_id,
match data_column {
Some(data_column) => RpcEvent::Response(data_column, seen_timestamp),
None => RpcEvent::StreamTermination,
},
);
}
SyncRequestId::RangeBlockAndBlobs { id: _ } => {
// TODO(das): implement custody range sync
}
_ => {
crit!(self.log, "bad request id for data_column"; "peer_id" => %peer_id);
}
}
}

fn on_single_blob_response(
Expand All @@ -944,6 +962,21 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}

fn on_data_columns_by_root_response(
&mut self,
req_id: DataColumnsByRootRequestId,
_requester: SingleLookupReqId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
) {
if let Some(_resp) = self
.network
.on_data_columns_by_root_response(req_id, peer_id, rpc_event)
{
// TODO(das): pass data_columns_by_root result to consumer
}
}

/// Handles receiving a response for a range sync request that should have both blocks and
/// blobs.
fn range_block_and_blobs_response(
Expand Down
124 changes: 115 additions & 9 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineStat
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::service::api_types::{AppRequestId, Id, SingleLookupReqId, SyncRequestId};
use lighthouse_network::service::api_types::{
AppRequestId, DataColumnsByRootRequestId, Id, SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
pub use requests::LookupVerifyError;
use requests::{ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest};
use slog::{debug, error, trace, warn};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock};
use types::{
BlobSidecar, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock,
};

mod requests;

Expand Down Expand Up @@ -96,10 +101,10 @@ impl From<LookupVerifyError> for RpcResponseError {
/// Sequential ID that uniquely identifies ReqResp outgoing requests
pub type ReqId = u32;

pub enum LookupRequestResult {
pub enum LookupRequestResult<I = ReqId> {
/// A request is sent. Sync MUST receive an event from the network in the future for either:
/// completed response or failed request
RequestSent(ReqId),
RequestSent(I),
/// No request is sent, and no further action is necessary to consider this request completed
NoRequestNeeded,
/// No request is sent, but the request is not completed. Sync MUST receive some future event
Expand All @@ -123,6 +128,10 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// A mapping of active BlobsByRoot requests, including both current slot and parent lookups.
blobs_by_root_requests: FnvHashMap<SingleLookupReqId, ActiveBlobsByRootRequest<T::EthSpec>>,

/// A mapping of active DataColumnsByRoot requests
data_columns_by_root_requests:
FnvHashMap<DataColumnsByRootRequestId, ActiveDataColumnsByRootRequest<T::EthSpec>>,

/// BlocksByRange requests paired with BlobsByRange
range_blocks_and_blobs_requests:
FnvHashMap<Id, (RangeRequestId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
Expand Down Expand Up @@ -171,6 +180,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: 1,
blocks_by_root_requests: <_>::default(),
blobs_by_root_requests: <_>::default(),
data_columns_by_root_requests: <_>::default(),
range_blocks_and_blobs_requests: FnvHashMap::default(),
network_beacon_processor,
chain,
Expand Down Expand Up @@ -211,10 +221,21 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
None
}
});
let failed_data_column_by_root_ids =
self.data_columns_by_root_requests
.iter()
.filter_map(|(req_id, request)| {
if request.peer_id == *peer_id {
Some(SyncRequestId::DataColumnsByRoot(*req_id, request.requester))
} else {
None
}
});

failed_range_ids
.chain(failed_block_ids)
.chain(failed_blob_ids)
.chain(failed_data_column_by_root_ids)
.collect()
}

Expand Down Expand Up @@ -529,6 +550,43 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(LookupRequestResult::RequestSent(req_id))
}

/// Request to send a single `data_columns_by_root` request to the network.
pub fn data_column_lookup_request(
&mut self,
requester: SingleLookupReqId,
peer_id: PeerId,
request: DataColumnsByRootSingleBlockRequest,
) -> Result<LookupRequestResult<DataColumnsByRootRequestId>, &'static str> {
let req_id = DataColumnsByRootRequestId(self.next_id());
debug!(
self.log,
"Sending DataColumnsByRoot Request";
"method" => "DataColumnsByRoot",
"block_root" => ?request.block_root,
"indices" => ?request.indices,
"peer" => %peer_id,
"requester" => ?requester,
"req_id" => %req_id,
);

self.send_network_msg(NetworkMessage::SendRequest {
peer_id,
request: Request::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)),
request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id, requester)),
})?;

self.data_columns_by_root_requests.insert(
req_id,
ActiveDataColumnsByRootRequest::new(request, peer_id, requester),
);

Ok(LookupRequestResult::RequestSent(req_id))
}

/// Request to fetch all needed custody columns of a specific block. This function may not send
/// any request to the network if no columns have to be fetched based on the import state of the
/// node. A custody request is a "super request" that may trigger 0 or more `data_columns_by_root`
/// requests.
pub fn custody_lookup_request(
&mut self,
lookup_id: SingleLookupId,
Expand Down Expand Up @@ -707,14 +765,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
request_id: SingleLookupReqId,
peer_id: PeerId,
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcResponseResult<Arc<SignedBeaconBlock<T::EthSpec>>>> {
let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blocks_by_root"]);
return None;
};

let resp = match block {
let resp = match rpc_event {
RpcEvent::Response(block, seen_timestamp) => {
match request.get_mut().add_response(block) {
Ok(block) => Ok((block, seen_timestamp)),
Expand Down Expand Up @@ -745,14 +803,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
request_id: SingleLookupReqId,
peer_id: PeerId,
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else {
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blobs_by_root"]);
return None;
};

let resp = match blob {
let resp = match rpc_event {
RpcEvent::Response(blob, seen_timestamp) => {
let request = request.get_mut();
match request.add_response(blob) {
Expand Down Expand Up @@ -791,6 +849,54 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}

#[allow(clippy::type_complexity)]
pub fn on_data_columns_by_root_response(
&mut self,
id: DataColumnsByRootRequestId,
peer_id: PeerId,
rpc_event: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<Vec<Arc<DataColumnSidecar<T::EthSpec>>>>> {
let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else {
return None;
};

let resp = match rpc_event {
RpcEvent::Response(data_column, seen_timestamp) => {
let request = request.get_mut();
match request.add_response(data_column) {
Ok(Some(data_columns)) => Ok((data_columns, seen_timestamp)),
Ok(None) => return None,
Err(e) => Err((e.into(), request.resolve())),
}
}
RpcEvent::StreamTermination => match request.remove().terminate() {
Ok(_) => return None,
// (err, false = not resolved) because terminate returns Ok() if resolved
Err(e) => Err((e.into(), false)),
},
RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())),
};

match resp {
Ok(resp) => Some(Ok(resp)),
// Track if this request has already returned some value downstream. Ensure that
// downstream code only receives a single Result per request. If the serving peer does
// multiple penalizable actions per request, downscore and return None. This allows to
// catch if a peer is returning more columns than requested or if the excess blobs are
// invalid.
Err((e, resolved)) => {
if let RpcResponseError::VerifyError(e) = &e {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
if resolved {
None
} else {
Some(Err(e))
}
}
}
}

pub fn send_block_for_processing(
&self,
id: Id,
Expand Down Expand Up @@ -900,7 +1006,7 @@ fn to_fixed_blob_sidecar_list<E: EthSpec>(
let index = blob.index as usize;
*fixed_list
.get_mut(index)
.ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob)
.ok_or(LookupVerifyError::UnrequestedIndex(index as u64))? = Some(blob)
}
Ok(fixed_list)
}
10 changes: 8 additions & 2 deletions beacon_node/network/src/sync/network_context/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@ use types::{
blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock,
};

pub use data_columns_by_root::{
ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest,
};

mod data_columns_by_root;

#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum LookupVerifyError {
NoResponseReturned,
NotEnoughResponsesReturned { expected: usize, actual: usize },
TooManyResponses,
UnrequestedBlockRoot(Hash256),
UnrequestedBlobIndex(u64),
UnrequestedIndex(u64),
InvalidInclusionProof,
DuplicateData,
}
Expand Down Expand Up @@ -131,7 +137,7 @@ impl<E: EthSpec> ActiveBlobsByRootRequest<E> {
return Err(LookupVerifyError::InvalidInclusionProof);
}
if !self.request.indices.contains(&blob.index) {
return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index));
return Err(LookupVerifyError::UnrequestedIndex(blob.index));
}
if self.blobs.iter().any(|b| b.index == blob.index) {
return Err(LookupVerifyError::DuplicateData);
Expand Down
Loading
Loading