diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 427866ba10e..b6de128dbbd 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1155,6 +1155,25 @@ impl BeaconChain { .map_or_else(|| self.get_blobs(block_root), Ok) } + pub fn get_data_column_checking_all_caches( + &self, + block_root: Hash256, + index: ColumnIndex, + ) -> Result>>, Error> { + if let Some(column) = self + .data_availability_checker + .get_data_column(&DataColumnIdentifier { block_root, index })? + { + return Ok(Some(column)); + } + + if let Some(columns) = self.early_attester_cache.get_data_columns(block_root) { + return Ok(columns.iter().find(|c| c.index == index).cloned()); + } + + self.get_data_column(&block_root, &index) + } + /// Returns the block at the given root, if any. /// /// ## Errors @@ -1230,6 +1249,18 @@ impl BeaconChain { } } + /// Returns the data columns at the given root, if any. + /// + /// ## Errors + /// May return a database error. + pub fn get_data_column( + &self, + block_root: &Hash256, + column_index: &ColumnIndex, + ) -> Result>>, Error> { + Ok(self.store.get_data_column(block_root, column_index)?) + } + pub fn get_blinded_block( &self, block_root: &Hash256, diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index bf9f94b9864..1bfe377ad05 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -15,8 +15,8 @@ use std::time::Duration; use task_executor::TaskExecutor; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ - BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, - Slot, + BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList, + Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, }; mod error; @@ -173,6 +173,14 @@ impl DataAvailabilityChecker { self.availability_cache.peek_blob(blob_id) } + /// Get a data column from the availability cache. + pub fn get_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + self.availability_cache.peek_data_column(data_column_id) + } + /// Put a list of blobs received via RPC into the availability cache. This performs KZG /// verification on the blobs in the list. pub fn put_rpc_blobs( diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 7108f7153c7..50fae091196 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -13,7 +13,10 @@ use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, ChainSpec, ColumnIndex, Epoch, EthSpec, Hash256, SignedBeaconBlock}; +use types::{ + BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec, + Hash256, SignedBeaconBlock, +}; /// This represents the components of a partially available block /// @@ -389,6 +392,22 @@ impl DataAvailabilityCheckerInner { } } + /// Fetch a data column from the cache without affecting the LRU ordering + pub fn peek_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + if let Some(pending_components) = self.critical.read().peek(&data_column_id.block_root) { + Ok(pending_components + .verified_data_columns + .iter() + .find(|data_column| data_column.as_data_column().index == data_column_id.index) + .map(|data_column| data_column.clone_arc())) + } else { + Ok(None) + } + } + pub fn peek_pending_components>) -> R>( &self, block_root: &Hash256, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 6a4efd605df..279af20909b 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -254,6 +254,9 @@ impl KzgVerifiedCustodyDataColumn { pub fn as_data_column(&self) -> &DataColumnSidecar { &self.data } + pub fn clone_arc(&self) -> Arc> { + self.data.clone() + } } /// Complete kzg verification for a `DataColumnSidecar`. diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 8849a5433d4..7c7dca02f50 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -6,6 +6,7 @@ use serde::Serialize; use ssz::Encode; use ssz_derive::{Decode, Encode}; use ssz_types::{typenum::U256, VariableList}; +use std::collections::BTreeMap; use std::fmt::Display; use std::marker::PhantomData; use std::ops::Deref; @@ -426,6 +427,17 @@ impl DataColumnsByRootRequest { pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self { Self::new(vec![DataColumnIdentifier { block_root, index }], spec) } + + pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec)> { + let mut column_indexes_by_block = BTreeMap::>::new(); + for request_id in self.data_column_ids.as_slice() { + column_indexes_by_block + .entry(request_id.block_root) + .or_default() + .push(request_id.index); + } + column_indexes_by_block.into_iter().collect() + } } /* RPC Handling and Grouping */ diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 3f8cf14dcbe..0defe7ad879 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -320,14 +320,64 @@ impl NetworkBeaconProcessor { pub fn handle_data_columns_by_root_request( self: Arc, peer_id: PeerId, - _request_id: PeerRequestId, + request_id: PeerRequestId, request: DataColumnsByRootRequest, ) { - // TODO(das): implement handler - debug!(self.log, "Received DataColumnsByRoot Request"; - "peer_id" => %peer_id, - "count" => request.data_column_ids.len() + self.terminate_response_stream( + peer_id, + request_id, + self.handle_data_columns_by_root_request_inner(peer_id, request_id, request), + Response::DataColumnsByRoot, + ); + } + + /// Handle a `DataColumnsByRoot` request from the peer. + pub fn handle_data_columns_by_root_request_inner( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + request: DataColumnsByRootRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { + let mut send_data_column_count = 0; + + for data_column_id in request.data_column_ids.as_slice() { + match self.chain.get_data_column_checking_all_caches( + data_column_id.block_root, + data_column_id.index, + ) { + Ok(Some(data_column)) => { + send_data_column_count += 1; + self.send_response( + peer_id, + Response::DataColumnsByRoot(Some(data_column)), + request_id, + ); + } + Ok(None) => {} // no-op + Err(e) => { + // TODO(das): lower log level when feature is stabilized + error!(self.log, "Error getting data column"; + "block_root" => ?data_column_id.block_root, + "peer" => %peer_id, + "error" => ?e + ); + return Err(( + RPCResponseErrorCode::ServerError, + "Error getting data column", + )); + } + } + } + + debug!( + self.log, + "Received DataColumnsByRoot Request"; + "peer" => %peer_id, + "request" => ?request.group_by_ordered_block_root(), + "returned" => send_data_column_count ); + + Ok(()) } /// Handle a `LightClientBootstrap` request from the peer. @@ -833,17 +883,196 @@ impl NetworkBeaconProcessor { /// Handle a `DataColumnsByRange` request from the peer. pub fn handle_data_columns_by_range_request( - self: Arc, + &self, peer_id: PeerId, - _request_id: PeerRequestId, + request_id: PeerRequestId, req: DataColumnsByRangeRequest, ) { - // TODO(das): implement handler + self.terminate_response_stream( + peer_id, + request_id, + self.handle_data_columns_by_range_request_inner(peer_id, request_id, req), + Response::DataColumnsByRange, + ); + } + + /// Handle a `DataColumnsByRange` request from the peer. + pub fn handle_data_columns_by_range_request_inner( + &self, + peer_id: PeerId, + request_id: PeerRequestId, + req: DataColumnsByRangeRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { debug!(self.log, "Received DataColumnsByRange Request"; "peer_id" => %peer_id, "count" => req.count, "start_slot" => req.start_slot, ); + + // Should not send more than max request data columns + if req.max_requested::() > self.chain.spec.max_request_data_column_sidecars { + return Err(( + RPCResponseErrorCode::InvalidRequest, + "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", + )); + } + + let request_start_slot = Slot::from(req.start_slot); + + let data_availability_boundary_slot = match self.chain.data_availability_boundary() { + Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), + None => { + debug!(self.log, "Deneb fork is disabled"); + return Err(( + RPCResponseErrorCode::InvalidRequest, + "Deneb fork is disabled", + )); + } + }; + + let oldest_data_column_slot = self + .chain + .store + .get_data_column_info() + .oldest_data_column_slot + .unwrap_or(data_availability_boundary_slot); + + if request_start_slot < oldest_data_column_slot { + debug!( + self.log, + "Range request start slot is older than data availability boundary."; + "requested_slot" => request_start_slot, + "oldest_data_column_slot" => oldest_data_column_slot, + "data_availability_boundary" => data_availability_boundary_slot + ); + + return if data_availability_boundary_slot < oldest_data_column_slot { + Err(( + RPCResponseErrorCode::ResourceUnavailable, + "blobs pruned within boundary", + )) + } else { + Err(( + RPCResponseErrorCode::InvalidRequest, + "Req outside availability period", + )) + }; + } + + let forwards_block_root_iter = + match self.chain.forwards_iter_block_roots(request_start_slot) { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockError( + HistoricalBlockError::BlockOutOfRange { + slot, + oldest_block_slot, + }, + )) => { + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); + return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling")); + } + Err(e) => { + error!(self.log, "Unable to obtain root iter"; + "request" => ?req, + "peer" => %peer_id, + "error" => ?e + ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); + } + }; + + // Use `WhenSlotSkipped::Prev` to get the most recent block root prior to + // `request_start_slot` in order to check whether the `request_start_slot` is a skip. + let mut last_block_root = req.start_slot.checked_sub(1).and_then(|prev_slot| { + self.chain + .block_root_at_slot(Slot::new(prev_slot), WhenSlotSkipped::Prev) + .ok() + .flatten() + }); + + // Pick out the required blocks, ignoring skip-slots. + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot.saturating_add(req.count)) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => { + error!(self.log, "Error during iteration over blocks"; + "request" => ?req, + "peer" => %peer_id, + "error" => ?e + ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); + } + }; + + // remove all skip slots + let block_roots = block_roots.into_iter().flatten(); + let mut data_columns_sent = 0; + + for root in block_roots { + for index in &req.columns { + match self.chain.get_data_column(&root, index) { + Ok(Some(data_column_sidecar)) => { + data_columns_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::DataColumnsByRange(Some( + data_column_sidecar.clone(), + )), + id: request_id, + }); + } + Ok(None) => {} // no-op + Err(e) => { + error!( + self.log, + "Error fetching data columns block root"; + "request" => ?req, + "peer" => %peer_id, + "block_root" => ?root, + "error" => ?e + ); + return Err(( + RPCResponseErrorCode::ServerError, + "No data columns and failed fetching corresponding block", + )); + } + } + } + } + + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + debug!( + self.log, + "DataColumnsByRange Response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => data_columns_sent + ); + + Ok(()) } /// Helper function to ensure single item protocol always end with either a single chunk or an