diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0a6a4666d13..229726d1b14 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -219,11 +219,13 @@ impl TryInto for AvailabilityProcessingStatus { /// The result of a chain segment processing. pub enum ChainSegmentResult { /// Processing this chain segment finished successfully. - Successful { imported_blocks: usize }, + Successful { + imported_blocks: Vec<(Hash256, Slot)>, + }, /// There was an error processing this chain segment. Before the error, some blocks could /// have been imported. Failed { - imported_blocks: usize, + imported_blocks: Vec<(Hash256, Slot)>, error: BlockError, }, } @@ -2709,7 +2711,7 @@ impl BeaconChain { chain_segment: Vec>, ) -> Result>, ChainSegmentResult> { // This function will never import any blocks. - let imported_blocks = 0; + let imported_blocks = vec![]; let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len()); // Produce a list of the parent root and slot of the child of each block. @@ -2815,7 +2817,7 @@ impl BeaconChain { chain_segment: Vec>, notify_execution_layer: NotifyExecutionLayer, ) -> ChainSegmentResult { - let mut imported_blocks = 0; + let mut imported_blocks = vec![]; // Filter uninteresting blocks from the chain segment in a blocking task. let chain = self.clone(); @@ -2875,6 +2877,8 @@ impl BeaconChain { // Import the blocks into the chain. for signature_verified_block in signature_verified_blocks { + let block_slot = signature_verified_block.slot(); + match self .process_block( signature_verified_block.block_root(), @@ -2886,9 +2890,9 @@ impl BeaconChain { { Ok(status) => { match status { - AvailabilityProcessingStatus::Imported(_) => { + AvailabilityProcessingStatus::Imported(block_root) => { // The block was imported successfully. - imported_blocks += 1; + imported_blocks.push((block_root, block_slot)); } AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { warn!(self.log, "Blobs missing in response to range request"; @@ -6823,6 +6827,16 @@ impl BeaconChain { self.data_availability_checker.data_availability_boundary() } + /// Returns true if we should issue a sampling request for this block + /// TODO(das): check if the block is still within the da_window + pub fn should_sample_slot(&self, slot: Slot) -> bool { + self.spec + .eip7594_fork_epoch + .map_or(false, |eip7594_fork_epoch| { + slot.epoch(T::EthSpec::slots_per_epoch()) >= eip7594_fork_epoch + }) + } + pub fn logger(&self) -> &Logger { &self.log } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 0a49e107042..009609621ef 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1225,6 +1225,10 @@ impl SignatureVerifiedBlock { pub fn block_root(&self) -> Hash256 { self.block_root } + + pub fn slot(&self) -> Slot { + self.block.slot() + } } impl IntoExecutionPendingBlock for SignatureVerifiedBlock { diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index c3e77ae225e..76dbd483b57 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,9 +1,11 @@ +use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::discovery::CombinedKey; use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId}; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; use score::{PeerAction, ReportSource, Score, ScoreState}; use slog::{crit, debug, error, trace, warn}; +use ssz::Encode; use std::net::IpAddr; use std::time::Instant; use std::{cmp::Ordering, fmt::Display}; @@ -673,9 +675,23 @@ impl PeerDB { } /// Updates the connection state. MUST ONLY BE USED IN TESTS. - pub fn __add_connected_peer_testing_only(&mut self, peer_id: &PeerId) -> Option { + pub fn __add_connected_peer_testing_only( + &mut self, + peer_id: &PeerId, + supernode: bool, + ) -> Option { let enr_key = CombinedKey::generate_secp256k1(); - let enr = Enr::builder().build(&enr_key).unwrap(); + let mut enr = Enr::builder().build(&enr_key).unwrap(); + + if supernode { + enr.insert( + PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, + &(E::data_column_subnet_count() as u64).as_ssz_bytes(), + &enr_key, + ) + .expect("u64 can be encoded"); + } + self.update_connection_state( peer_id, NewConnectionState::Connected { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 5dd74f25726..de4ef7785fd 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -1,4 +1,5 @@ //! A collection of variables that are accessible outside of the network thread itself. +use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV2}; use crate::types::{BackFillState, SyncState}; @@ -6,6 +7,7 @@ use crate::EnrExt; use crate::{Client, Eth2Enr}; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; +use ssz::Encode; use std::collections::HashSet; use types::data_column_sidecar::ColumnIndex; use types::{DataColumnSubnetId, Epoch, EthSpec}; @@ -141,6 +143,22 @@ impl NetworkGlobals { log, ) } + + /// TESTING ONLY. Set a custody_subnet_count value + pub fn test_mutate_custody_subnet_count(&mut self, value: u64) { + use crate::CombinedKeyExt; + // For test: use a random key. WARNING: changes ENR NodeID + let keypair = libp2p::identity::secp256k1::Keypair::generate(); + let enr_key = discv5::enr::CombinedKey::from_secp256k1(&keypair); + self.local_enr + .write() + .insert( + PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, + &value.as_ssz_bytes(), + &enr_key, + ) + .expect("u64 can be serialized"); + } } #[cfg(test)] diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 9259e6826b1..da14b1b5e31 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1278,20 +1278,16 @@ impl NetworkBeaconProcessor { let block = verified_block.block.block_cloned(); let block_root = verified_block.block_root; + // TODO(das) Might be too early to issue a request here. We haven't checked that the block + // actually includes blob transactions and thus has data. A peer could send a block is + // garbage commitments, and make us trigger sampling for a block that does not have data. if block.num_expected_blobs() > 0 { // Trigger sampling for block not yet execution valid. At this point column custodials are // unlikely to have received their columns. Triggering sampling so early is only viable with // either: // - Sync delaying sampling until some latter window // - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569 - if self - .chain - .spec - .eip7594_fork_epoch - .map_or(false, |eip7594_fork_epoch| { - block.epoch() >= eip7594_fork_epoch - }) - { + if self.chain.should_sample_slot(block.slot()) { self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot())); } } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 2aa498da857..eaba4933f02 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -140,6 +140,7 @@ impl NetworkBeaconProcessor { }; let slot = block.slot(); + let block_has_data = block.as_block().num_expected_blobs() > 0; let parent_root = block.message().parent_root(); let commitments_formatted = block.as_block().commitments_formatted(); @@ -160,6 +161,17 @@ impl NetworkBeaconProcessor { metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); + // RPC block imported or execution validated. If the block was already imported by gossip we + // receive Err(BlockError::AlreadyKnown). + if result.is_ok() && + // Block has at least one blob, so it produced columns + block_has_data && + // Block slot is within the DA boundary (should always be the case) and PeerDAS is activated + self.chain.should_sample_slot(slot) + { + self.send_sync_message(SyncMessage::SampleBlock(block_root, slot)); + } + // RPC block imported, regardless of process type if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result { info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); @@ -491,10 +503,19 @@ impl NetworkBeaconProcessor { { ChainSegmentResult::Successful { imported_blocks } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); - if imported_blocks > 0 { + if !imported_blocks.is_empty() { self.chain.recompute_head_at_current_slot().await; + + for (block_root, block_slot) in &imported_blocks { + if self.chain.should_sample_slot(*block_slot) { + self.send_sync_message(SyncMessage::SampleBlock( + *block_root, + *block_slot, + )); + } + } } - (imported_blocks, Ok(())) + (imported_blocks.len(), Ok(())) } ChainSegmentResult::Failed { imported_blocks, @@ -502,10 +523,10 @@ impl NetworkBeaconProcessor { } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); let r = self.handle_failed_chain_segment(error); - if imported_blocks > 0 { + if !imported_blocks.is_empty() { self.chain.recompute_head_at_current_slot().await; } - (imported_blocks, r) + (imported_blocks.len(), r) } } } diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 5491bd7120f..6d98b8656a9 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -1,21 +1,24 @@ use crate::sync::block_lookups::single_block_lookup::{ - LookupRequestError, SingleBlockLookup, SingleLookupRequestState, + LookupError, SingleBlockLookup, SingleLookupRequestState, +}; +use crate::sync::block_lookups::{ + BlobRequestState, BlockRequestState, CustodyColumnRequestState, PeerId, +}; +use crate::sync::manager::{ + BlockProcessType, DataColumnsByRootRequester, Id, SLOT_IMPORT_TOLERANCE, +}; +use crate::sync::network_context::{ + DataColumnsByRootSingleBlockRequest, ReqId, SyncNetworkContext, }; -use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId}; -use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE}; -use crate::sync::network_context::SyncNetworkContext; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::BeaconChainTypes; use std::sync::Arc; -use types::blob_sidecar::FixedBlobSidecarList; -use types::SignedBeaconBlock; +use types::{blob_sidecar::FixedBlobSidecarList, ColumnIndex, Epoch, SignedBeaconBlock}; use super::single_block_lookup::DownloadResult; use super::SingleLookupId; -use super::single_block_lookup::CustodyRequestState; - #[derive(Debug, Copy, Clone)] pub enum ResponseType { Block, @@ -49,16 +52,17 @@ pub trait RequestState { peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result; + ) -> Result, LookupError>; /* Response handling methods */ /// Send the response to the beacon processor. fn send_for_processing( id: Id, + component_index: usize, result: DownloadResult, cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError>; + ) -> Result<(), LookupError>; /* Utility methods */ @@ -66,7 +70,10 @@ pub trait RequestState { fn response_type() -> ResponseType; /// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait. - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self; + fn request_state_mut( + request: &mut SingleBlockLookup, + component_index: usize, + ) -> Option<&mut Self>; /// A getter for a reference to the `SingleLookupRequestState` associated with this trait. fn get_state(&self) -> &SingleLookupRequestState; @@ -82,18 +89,19 @@ impl RequestState for BlockRequestState { &self, id: SingleLookupId, peer_id: PeerId, - _: Option, + _downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result, LookupError> { cx.block_lookup_request(id, peer_id, self.requested_block_root) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn send_for_processing( id: SingleLookupId, + _component_index: usize, download_result: DownloadResult, cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result<(), LookupError> { let DownloadResult { value, block_root, @@ -106,14 +114,14 @@ impl RequestState for BlockRequestState { seen_timestamp, BlockProcessType::SingleBlock { id }, ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn response_type() -> ResponseType { ResponseType::Block } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.block_request_state + fn request_state_mut(request: &mut SingleBlockLookup, _: usize) -> Option<&mut Self> { + Some(&mut request.block_request_state) } fn get_state(&self) -> &SingleLookupRequestState { &self.state @@ -132,21 +140,22 @@ impl RequestState for BlobRequestState { peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result, LookupError> { cx.blob_lookup_request( id, peer_id, self.block_root, downloaded_block_expected_blobs, ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn send_for_processing( id: Id, + _component_index: usize, download_result: DownloadResult, cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result<(), LookupError> { let DownloadResult { value, block_root, @@ -159,14 +168,14 @@ impl RequestState for BlobRequestState { seen_timestamp, BlockProcessType::SingleBlob { id }, ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn response_type() -> ResponseType { ResponseType::Blob } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.blob_request_state + fn request_state_mut(request: &mut SingleBlockLookup, _: usize) -> Option<&mut Self> { + Some(&mut request.blob_request_state) } fn get_state(&self) -> &SingleLookupRequestState { &self.state @@ -176,26 +185,52 @@ impl RequestState for BlobRequestState { } } -impl RequestState for CustodyRequestState { - type VerifiedResponseType = Vec>; +impl RequestState for CustodyColumnRequestState { + type VerifiedResponseType = CustodyDataColumn; fn make_request( &self, id: Id, // TODO(das): consider selecting peers that have custody but are in this set _peer_id: PeerId, - downloaded_block_expected_blobs: Option, + _downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result { - cx.custody_lookup_request(id, self.block_root, downloaded_block_expected_blobs) - .map_err(LookupRequestError::SendFailed) + ) -> Result, LookupError> { + // TODO: Ensure that requests are not sent when the block is: + // - not yet in PeerDAS epoch + // - outside da_boundary (maybe not necessary to check) + + // TODO(das): no rotation + let block_epoch = Epoch::new(0); + // TODO: When is a fork and only a subset of your peers know about a block, sampling should only + // be queried on the peers on that fork. Should this case be handled? How to handle it? + let peer_ids = cx.get_custodial_peers(block_epoch, self.column_index); + + // TODO(das) randomize custodial peer and avoid failing peers + let Some(peer_id) = peer_ids.first().cloned() else { + // Allow custody lookups to have zero peers. If no peers are found to have custody after + // some time the lookup is dropped. + return Ok(None); + }; + + cx.data_column_lookup_request( + DataColumnsByRootRequester::Custody(id, self.column_index), + peer_id, + DataColumnsByRootSingleBlockRequest { + block_root: self.block_root, + indices: vec![self.column_index], + }, + ) + .map(Some) + .map_err(LookupError::SendFailed) } fn send_for_processing( id: Id, + component_index: usize, download_result: DownloadResult, cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result<(), LookupError> { let DownloadResult { value, block_root, @@ -204,18 +239,24 @@ impl RequestState for CustodyRequestState { } = download_result; cx.send_custody_columns_for_processing( block_root, - value, + // TODO(das): might be inneficient to send columns one by one + vec![value], seen_timestamp, - BlockProcessType::SingleCustodyColumn(id), + BlockProcessType::SingleCustodyColumn(id, component_index as ColumnIndex), ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn response_type() -> ResponseType { ResponseType::CustodyColumn } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.custody_request_state + fn request_state_mut( + request: &mut SingleBlockLookup, + component_index: usize, + ) -> Option<&mut Self> { + request + .custody_columns_requests + .get_mut(&(component_index as u64)) } fn get_state(&self) -> &SingleLookupRequestState { &self.state diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 1b6f449ec37..505d1afd734 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,12 +1,12 @@ use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; -use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; +use self::single_block_lookup::{LookupError, LookupResult, SingleBlockLookup}; use super::manager::{BlockProcessType, BlockProcessingResult}; -use super::network_context::{LookupFailure, PeerGroup, SyncNetworkContext}; +use super::network_context::{PeerGroup, RpcByRootRequestError, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; -use crate::sync::manager::Id; +use crate::sync::manager::{Id, SingleLookupReqId}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; @@ -14,13 +14,13 @@ pub use common::RequestState; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; +pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyColumnRequestState}; use slog::{debug, error, warn, Logger}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; use store::Hash256; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, Epoch, EthSpec, SignedBeaconBlock}; pub mod common; pub mod parent_chain; @@ -31,6 +31,13 @@ mod tests; const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4; +#[derive(Debug)] +pub enum LookupRequestError { + BlockRequestError(RpcByRootRequestError), + BlobRequestError(RpcByRootRequestError), + CustodyRequestError(RpcByRootRequestError), +} + pub enum BlockComponent { Block(DownloadResult>>), Blob(DownloadResult>>), @@ -260,9 +267,23 @@ impl BlockLookups { } } + // TODO(das): If there's rotation we need to fetch the block before knowing which columns to + // custody. + let block_epoch = Epoch::new(0); + let custody_column_indexes = cx + .network_globals() + .custody_columns(block_epoch) + .expect("TODO: parse custody value on start-up"); + // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), // signal here to hold processing downloaded data. - let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); + let mut lookup = SingleBlockLookup::new( + block_root, + peers, + cx.next_id(), + awaiting_parent, + custody_column_indexes, + ); let msg = if block_component.is_some() { "Searching for components of a block with unknown parent" @@ -304,53 +325,57 @@ impl BlockLookups { /// Process a block or blob response received from a single lookup request. pub fn on_download_response>( &mut self, - id: SingleLookupId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupFailure>, + id: SingleLookupReqId, + component_index: usize, + response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupRequestError>, cx: &mut SyncNetworkContext, ) { - let result = self.on_download_response_inner::(id, response, cx); - self.on_lookup_result(id, result, "download_response", cx); + let result = self.on_download_response_inner::(id, component_index, response, cx); + self.on_lookup_result(id.lookup_id, result, "download_response", cx); } /// Process a block or blob response received from a single lookup request. pub fn on_download_response_inner>( &mut self, - id: SingleLookupId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupFailure>, + id: SingleLookupReqId, + component_index: usize, + response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupRequestError>, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { // Note: do not downscore peers here for requests errors, SyncNetworkContext does it. - let response_type = R::response_type(); - let Some(lookup) = self.single_block_lookups.get_mut(&id) else { - // We don't have the ability to cancel in-flight RPC requests. So this can happen - // if we started this RPC request, and later saw the block/blobs via gossip. - debug!(self.log, "Block returned for single block lookup not present"; "id" => id); - return Err(LookupRequestError::UnknownLookup); - }; + let lookup = self + .single_block_lookups + .get_mut(&id.lookup_id) + .ok_or(LookupError::UnknownLookup)?; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup).get_state_mut(); + let request = R::request_state_mut(lookup, component_index) + .ok_or(LookupError::UnknownComponentIndex(component_index))?; + let request_state = request.get_state_mut(); match response { Ok((response, peer_group, seen_timestamp)) => { debug!(self.log, "Received lookup download success"; "block_root" => ?block_root, - "id" => id, + "id" => ?id, "peer_group" => ?peer_group, - "response_type" => ?response_type, + "component" => format!("{:?}({})", R::response_type(), component_index), ); // Register the download peer here. Once we have received some data over the wire we // attribute it to this peer for scoring latter regardless of how the request was // done. - request_state.on_download_success(DownloadResult { - value: response, - block_root, - seen_timestamp, - peer_group, - })?; + request_state.on_download_success( + id.req_id, + DownloadResult { + value: response, + block_root, + seen_timestamp, + peer_group, + }, + )?; // continue_request will send for processing as the request state is AwaitingProcessing } Err(e) => { @@ -359,12 +384,12 @@ impl BlockLookups { debug!(self.log, "Received lookup download failure"; "block_root" => ?block_root, - "id" => id, - "response_type" => ?response_type, + "id" => ?id, + "component" => format!("{:?}({})", R::response_type(), component_index), "error" => ?e, ); - request_state.on_download_failure()?; + request_state.on_download_failure(id.req_id)?; // continue_request will retry a download as the request state is AwaitingDownload } } @@ -393,38 +418,45 @@ impl BlockLookups { result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) { - let lookup_result = match process_type { - BlockProcessType::SingleBlock { id } => { - self.on_processing_result_inner::>(id, result, cx) - } - BlockProcessType::SingleBlob { id } => { - self.on_processing_result_inner::>(id, result, cx) - } - BlockProcessType::SingleCustodyColumn(id) => { - self.on_processing_result_inner::>(id, result, cx) - } - }; + let lookup_result = + match process_type { + BlockProcessType::SingleBlock { id } => self + .on_processing_result_inner::>(id, 0, result, cx), + BlockProcessType::SingleBlob { id } => self + .on_processing_result_inner::>(id, 0, result, cx), + BlockProcessType::SingleCustodyColumn(id, component_index) => { + self.on_processing_result_inner::>( + id, + component_index as usize, + result, + cx, + ) + } + }; self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); } pub fn on_processing_result_inner>( &mut self, lookup_id: SingleLookupId, + component_index: usize, result: BlockProcessingResult, cx: &mut SyncNetworkContext, - ) -> Result { - let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else { - debug!(self.log, "Unknown single block lookup"; "id" => lookup_id); - return Err(LookupRequestError::UnknownLookup); - }; + ) -> Result { + let lookup = self + .single_block_lookups + .get_mut(&lookup_id) + .ok_or(LookupError::UnknownLookup)?; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup).get_state_mut(); + let request = R::request_state_mut(lookup, component_index) + .ok_or(LookupError::UnknownComponentIndex(component_index))?; + let request_state = request.get_state_mut(); debug!( self.log, "Received lookup processing result"; - "component" => ?R::response_type(), + "component" => format!("{:?}({})", R::response_type(), component_index), "block_root" => ?block_root, "id" => lookup_id, "result" => ?result, @@ -445,27 +477,20 @@ impl BlockLookups { // if both components have been processed. request_state.on_processing_success()?; - // If this was the result of a block request, we can't determined if the block peer did anything - // wrong. If we already had both a block and blobs response processed, we should penalize the - // blobs peer because they did not provide all blobs on the initial request. - if lookup.both_components_processed() { - // TODO(das): extend to columns peers too - if let Some(blob_peer) = lookup - .blob_request_state - .state - .on_post_process_validation_failure()? - { - // TODO(das): downscore only the peer that served the request for all blobs - for peer in blob_peer.all() { - cx.report_peer( - *peer, - PeerAction::MidToleranceError, - "sent_incomplete_blobs", - ); - } - } + // We don't request for other block components until being sure that the block has + // data. If we request blobs / columns to a peer we are sure those must exist. + // Therefore if all components are processed and we still receive `MissingComponents` + // it indicates an internal bug. + if lookup.all_components_processed() { + return Err(LookupError::MissingComponentsAfterAllProcessed); + } else { + // Trigger reconstruction if node has more than 50% of columns. Reconstruction + // is expenseive so it should be handled as a separate work event. The + // result of the reconstruction must be imported and published to gossip. + // The result of a reconstruction will be an import block components + + Action::Retry } - Action::Retry } BlockProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. @@ -557,7 +582,7 @@ impl BlockLookups { } Action::Drop => { // Drop with noop - Err(LookupRequestError::Failed) + Err(LookupError::Failed) } Action::Continue => { // Drop this completed lookup only @@ -608,7 +633,7 @@ impl BlockLookups { fn on_lookup_result( &mut self, id: SingleLookupId, - result: Result, + result: Result, source: &str, cx: &mut SyncNetworkContext, ) { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 9020aea52e8..526936f00fd 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -2,18 +2,18 @@ use super::common::ResponseType; use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::Id; -use crate::sync::network_context::{PeerGroup, SyncNetworkContext}; +use crate::sync::network_context::{PeerGroup, ReqId, SyncNetworkContext}; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::BeaconChainTypes; use rand::seq::IteratorRandom; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; -use types::{EthSpec, SignedBeaconBlock}; +use types::{ColumnIndex, EthSpec, SignedBeaconBlock}; // Dedicated enum for LookupResult to force its usage #[must_use = "LookupResult must be handled with on_lookup_result"] @@ -25,7 +25,7 @@ pub enum LookupResult { } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum LookupRequestError { +pub enum LookupError { /// Too many failed attempts TooManyAttempts { /// The failed attempts were primarily due to processing failures. @@ -39,15 +39,27 @@ pub enum LookupRequestError { BadState(String), /// Lookup failed for some other reason and should be dropped Failed, + /// Received MissingComponents when all components have been processed. This should never + /// happen, and indicates some internal bug + MissingComponentsAfterAllProcessed, /// Attempted to retrieve a not known lookup id UnknownLookup, + /// Attempted to retrieve a not known lookup request by index + UnknownComponentIndex(usize), + /// Received a download result for a different request id than the in-flight request. + /// There should only exist a single request at a time. Having multiple requests is a bug and + /// can result in undefined state, so it's treated as a hard error and the lookup is dropped. + UnexpectedRequestId { + expected_req_id: ReqId, + req_id: ReqId, + }, } pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, pub blob_request_state: BlobRequestState, - pub custody_request_state: CustodyRequestState, + pub custody_columns_requests: HashMap>, block_root: Hash256, awaiting_parent: Option, /// Peers that claim to have imported this block @@ -56,17 +68,32 @@ pub struct SingleBlockLookup { impl SingleBlockLookup { pub fn new( - requested_block_root: Hash256, + block_root: Hash256, peers: &[PeerId], id: Id, awaiting_parent: Option, + custody_column_indexes: Vec, ) -> Self { + let column_count = custody_column_indexes.len(); + let custody_columns_requests = custody_column_indexes + .into_iter() + .map(|column_index| { + let request = CustodyColumnRequestState::new(block_root, column_index); + (column_index, request) + }) + .collect::>(); + debug_assert_eq!( + column_count, + custody_columns_requests.len(), + "duplicate column indexes" + ); + Self { id, - block_request_state: BlockRequestState::new(requested_block_root), - blob_request_state: BlobRequestState::new(requested_block_root), - custody_request_state: CustodyRequestState::new(requested_block_root), - block_root: requested_block_root, + block_request_state: BlockRequestState::new(block_root), + blob_request_state: BlobRequestState::new(block_root), + custody_columns_requests, + block_root, awaiting_parent, peers: peers.iter().copied().collect(), } @@ -125,19 +152,28 @@ impl SingleBlockLookup { pub fn continue_requests( &mut self, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { // TODO: Check what's necessary to download, specially for blobs - self.continue_request::>(cx)?; - self.continue_request::>(cx)?; - self.continue_request::>(cx)?; + self.continue_request::>(cx, 0)?; + self.continue_request::>(cx, 0)?; + + // TODO(das): Quick hack, but data inefficient to allocate this array every time + let column_indexes = self + .custody_columns_requests + .keys() + .copied() + .collect::>(); + for column_index in column_indexes { + self.continue_request::>( + cx, + column_index as usize, + )?; + } // If all components of this lookup are already processed, there will be no future events // that can make progress so it must be dropped. Consider the lookup completed. // This case can happen if we receive the components from gossip during a retry. - if self.block_request_state.state.is_processed() - && self.blob_request_state.state.is_processed() - && self.custody_request_state.state.is_processed() - { + if self.all_components_processed() { Ok(LookupResult::Completed) } else { Ok(LookupResult::Pending) @@ -148,34 +184,57 @@ impl SingleBlockLookup { fn continue_request>( &mut self, cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + component_index: usize, + ) -> Result<(), LookupError> { let id = self.id; let awaiting_parent = self.awaiting_parent.is_some(); let block_is_processed = self.block_request_state.state.is_processed(); - let request = R::request_state_mut(self); + let request = R::request_state_mut(self, component_index) + .ok_or(LookupError::UnknownComponentIndex(component_index))?; // Attempt to progress awaiting downloads if request.get_state().is_awaiting_download() { + // Verify the current request has not exceeded the maximum number of attempts. + let request_state = request.get_state(); + if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { + let cannot_process = request_state.more_failed_processing_attempts(); + return Err(LookupError::TooManyAttempts { cannot_process }); + } + let downloaded_block_expected_blobs = self .block_request_state .state .peek_downloaded_data() - .map(|block| block.num_expected_blobs()); - let peer_id = self - .get_rand_available_peer() - .ok_or(LookupRequestError::NoPeers)?; - let request = R::request_state_mut(self); + .map(|block| block.num_expected_blobs()) + .or_else(|| { + // This is a bit of a hack, becase the block request `Processed` state does not + // store block details. `peek_downloaded_data` only returns data if the block is + // actively downloading. + if self.block_request_state.state.is_processed() { + cx.chain + .data_availability_checker + .num_expected_blobs(&self.block_root) + } else { + None + } + }); + + let peer_id = self.get_rand_available_peer().ok_or(LookupError::NoPeers)?; + let request = R::request_state_mut(self, component_index) + .ok_or(LookupError::UnknownComponentIndex(component_index))?; // Verify the current request has not exceeded the maximum number of attempts. let request_state = request.get_state(); if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { let cannot_process = request_state.more_failed_processing_attempts(); - return Err(LookupRequestError::TooManyAttempts { cannot_process }); + return Err(LookupError::TooManyAttempts { cannot_process }); } // make_request returns true only if a request needs to be made - if request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { - request.get_state_mut().on_download_start()?; + if let Some(req_id) = + request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? + { + request.get_state_mut().on_download_start(req_id)?; } else { request.get_state_mut().on_completed_request()?; } @@ -189,7 +248,7 @@ impl SingleBlockLookup { // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is // useful to conditionally access the result data. if let Some(result) = request.get_state_mut().maybe_start_processing() { - return R::send_for_processing(id, result, cx); + return R::send_for_processing(id, component_index, result, cx); } } @@ -203,10 +262,13 @@ impl SingleBlockLookup { } /// Returns true if the block has already been downloaded. - pub fn both_components_processed(&self) -> bool { + pub fn all_components_processed(&self) -> bool { self.block_request_state.state.is_processed() && self.blob_request_state.state.is_processed() - && self.custody_request_state.state.is_processed() + && self + .custody_columns_requests + .values() + .all(|r| r.state.is_processed()) } /// Remove peer from available peers. Return true if there are no more available peers and all @@ -217,7 +279,10 @@ impl SingleBlockLookup { self.peers.is_empty() && self.block_request_state.state.is_awaiting_download() && self.blob_request_state.state.is_awaiting_download() - && self.custody_request_state.state.is_awaiting_download() + && self + .custody_columns_requests + .values() + .all(|r| r.state.is_awaiting_download()) } /// Selects a random peer from available peers if any, inserts it in used peers and returns it. @@ -242,15 +307,17 @@ impl BlobRequestState { } /// The state of the blob request component of a `SingleBlockLookup`. -pub struct CustodyRequestState { +pub struct CustodyColumnRequestState { pub block_root: Hash256, - pub state: SingleLookupRequestState>>, + pub column_index: ColumnIndex, + pub state: SingleLookupRequestState>, } -impl CustodyRequestState { - pub fn new(block_root: Hash256) -> Self { +impl CustodyColumnRequestState { + pub fn new(block_root: Hash256, column_index: ColumnIndex) -> Self { Self { block_root, + column_index, state: SingleLookupRequestState::new(), } } @@ -282,7 +349,7 @@ pub struct DownloadResult { #[derive(Debug)] pub enum State { AwaitingDownload, - Downloading, + Downloading(ReqId), AwaitingProcess(DownloadResult), Processing(DownloadResult), Processed(Option), @@ -350,13 +417,13 @@ impl SingleLookupRequestState { } /// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None. - pub fn on_download_start(&mut self) -> Result<(), LookupRequestError> { + pub fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupError> { match &self.state { State::AwaitingDownload => { - self.state = State::Downloading; + self.state = State::Downloading(req_id); Ok(()) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_download_start expected AwaitingDownload got {other}" ))), } @@ -364,14 +431,20 @@ impl SingleLookupRequestState { /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong /// block. - pub fn on_download_failure(&mut self) -> Result<(), LookupRequestError> { + pub fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupError> { match &self.state { - State::Downloading => { + State::Downloading(expected_req_id) => { + if req_id != *expected_req_id { + return Err(LookupError::UnexpectedRequestId { + expected_req_id: *expected_req_id, + req_id, + }); + } self.failed_downloading = self.failed_downloading.saturating_add(1); self.state = State::AwaitingDownload; Ok(()) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_download_failure expected Downloading got {other}" ))), } @@ -379,14 +452,21 @@ impl SingleLookupRequestState { pub fn on_download_success( &mut self, + req_id: ReqId, result: DownloadResult, - ) -> Result<(), LookupRequestError> { + ) -> Result<(), LookupError> { match &self.state { - State::Downloading => { + State::Downloading(expected_req_id) => { + if req_id != *expected_req_id { + return Err(LookupError::UnexpectedRequestId { + expected_req_id: *expected_req_id, + req_id, + }); + } self.state = State::AwaitingProcess(result); Ok(()) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_download_success expected Downloading got {other}" ))), } @@ -407,20 +487,20 @@ impl SingleLookupRequestState { /// Revert into `AwaitingProcessing`, if the payload if not invalid and can be submitted for /// processing latter. - pub fn revert_to_awaiting_processing(&mut self) -> Result<(), LookupRequestError> { + pub fn revert_to_awaiting_processing(&mut self) -> Result<(), LookupError> { match &self.state { State::Processing(result) => { self.state = State::AwaitingProcess(result.clone()); Ok(()) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on revert_to_awaiting_processing expected Processing got {other}" ))), } } /// Registers a failure in processing a block. - pub fn on_processing_failure(&mut self) -> Result { + pub fn on_processing_failure(&mut self) -> Result { match &self.state { State::Processing(result) => { let peers_source = result.peer_group.clone(); @@ -428,49 +508,33 @@ impl SingleLookupRequestState { self.state = State::AwaitingDownload; Ok(peers_source) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_processing_failure expected Processing got {other}" ))), } } - pub fn on_processing_success(&mut self) -> Result { + pub fn on_processing_success(&mut self) -> Result { match &self.state { State::Processing(result) => { let peer_group = result.peer_group.clone(); self.state = State::Processed(Some(peer_group.clone())); Ok(peer_group) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_processing_success expected Processing got {other}" ))), } } - pub fn on_post_process_validation_failure( - &mut self, - ) -> Result, LookupRequestError> { - match &self.state { - State::Processed(peer_group) => { - let peer_group = peer_group.clone(); - self.failed_processing = self.failed_processing.saturating_add(1); - self.state = State::AwaitingDownload; - Ok(peer_group) - } - other => Err(LookupRequestError::BadState(format!( - "Bad state on_post_process_validation_failure expected Processed got {other}" - ))), - } - } - /// Mark a request as complete without any download or processing - pub fn on_completed_request(&mut self) -> Result<(), LookupRequestError> { + pub fn on_completed_request(&mut self) -> Result<(), LookupError> { match &self.state { State::AwaitingDownload => { self.state = State::Processed(None); Ok(()) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_completed_request expected AwaitingDownload got {other}" ))), } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 63f00138d67..f6b1deb0672 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -89,6 +89,7 @@ type SamplingIds = Vec<(DataColumnsByRootRequestId, ColumnIndex)>; struct TestRigConfig { peer_das_enabled: bool, + supernode: bool, } impl TestRig { @@ -99,7 +100,7 @@ impl TestRig { // Use `fork_from_env` logic to set correct fork epochs let mut spec = test_spec::(); - if let Some(config) = config { + if let Some(config) = &config { if config.peer_das_enabled { spec.eip7594_fork_epoch = Some(Epoch::new(0)); } @@ -123,9 +124,15 @@ impl TestRig { let (network_tx, network_rx) = mpsc::unbounded_channel(); // TODO(das): make the generation of the ENR use the deterministic rng to have consistent // column assignments - let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log)); + let mut globals = NetworkGlobals::new_test_globals(Vec::new(), &log); + if let Some(config) = &config { + if config.supernode { + globals.test_mutate_custody_subnet_count(E::data_column_subnet_count() as u64); + } + } + let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( - globals, + Arc::new(globals), chain.clone(), harness.runtime.task_executor.clone(), log.clone(), @@ -179,6 +186,19 @@ impl TestRig { fn test_setup_after_peerdas() -> Option { let r = Self::test_setup_with_config(Some(TestRigConfig { peer_das_enabled: true, + supernode: false, + })); + if r.after_deneb() { + Some(r) + } else { + None + } + } + + fn test_setup_after_peerdas_supernode() -> Option { + let r = Self::test_setup_with_config(Some(TestRigConfig { + peer_das_enabled: true, + supernode: true, })); if r.after_deneb() { Some(r) @@ -356,12 +376,26 @@ impl TestRig { self.network_globals .peers .write() - .__add_connected_peer_testing_only(&peer_id); + .__add_connected_peer_testing_only(&peer_id, false); + peer_id + } + + fn new_connected_supernode_peer(&mut self) -> PeerId { + let peer_id = PeerId::random(); + self.network_globals + .peers + .write() + .__add_connected_peer_testing_only(&peer_id, true); peer_id } - fn new_connected_peers(&mut self, count: usize) -> Vec { - (0..count).map(|_| self.new_connected_peer()).collect() + fn new_connected_peers_for_peerdas(&mut self) { + // Enough sampling peers with few columns + for _ in 0..100 { + self.new_connected_peer(); + } + // One supernode peer to ensure all columns have at least one peer + self.new_connected_supernode_peer(); } fn parent_chain_processed_success( @@ -608,41 +642,41 @@ impl TestRig { data_columns: Vec>, missing_components: bool, ) { - let lookup_id = if let DataColumnsByRootRequester::Custody(id) = + let DataColumnsByRootRequester::Custody(lookup_id, _) = sampling_ids.first().unwrap().0.requester - { - id.id.0.lookup_id - } else { + else { panic!("not a custody requester") }; let first_column = data_columns.first().cloned().unwrap(); + let last_index = sampling_ids.len() - 1; - for (id, column_index) in sampling_ids { + for (i, (id, column_index)) in sampling_ids.into_iter().enumerate() { self.log(&format!("return valid data column for {column_index}")); let data_column = data_columns[column_index as usize].clone(); self.complete_data_columns_by_root_request(id, data_column); - } - - // Expect work event - // TODO(das): worth it to append sender id to the work event for stricter assertion? - self.expect_rpc_custody_column_work_event(); - // Respond with valid result - self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type: BlockProcessType::SingleCustodyColumn(lookup_id), - result: if missing_components { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - first_column.slot(), - first_column.block_root(), - )) - } else { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported( - first_column.block_root(), - )) - }, - }); + // Expect work event + // TODO(das): worth it to append sender id to the work event for stricter assertion? + self.expect_rpc_custody_column_work_event(); + + // Respond with valid result + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type: BlockProcessType::SingleCustodyColumn(lookup_id, column_index), + // Last column + should consider last element + result: if i == last_index && !missing_components { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported( + first_column.block_root(), + )) + } else { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + first_column.slot(), + first_column.block_root(), + )) + }, + }); + } } fn complete_data_columns_by_root_request( @@ -1584,7 +1618,7 @@ fn sampling_happy_path() { let Some(mut r) = TestRig::test_setup_after_peerdas() else { return; }; - r.new_connected_peers(100); // Add enough sampling peers + r.new_connected_peers_for_peerdas(); let (block, data_columns) = r.rand_block_and_data_columns(); let block_root = block.canonical_root(); r.trigger_sample_block(block_root, block.slot()); @@ -1601,7 +1635,7 @@ fn sampling_with_retries() { let Some(mut r) = TestRig::test_setup_after_peerdas() else { return; }; - r.new_connected_peers(100); // Add enough sampling peers + r.new_connected_peers_for_peerdas(); let (block, data_columns) = r.rand_block_and_data_columns(); let block_root = block.canonical_root(); r.trigger_sample_block(block_root, block.slot()); @@ -1621,7 +1655,26 @@ fn custody_lookup_happy_path() { let Some(mut r) = TestRig::test_setup_after_peerdas() else { return; }; - r.new_connected_peers(100); // Add enough sampling peers + r.new_connected_peers_for_peerdas(); + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should not request blobs + let id = r.expect_block_lookup_request(block.canonical_root()); + r.complete_valid_block_request(id, block.into(), true); + let custody_column_count = E::min_custody_requirement() * E::data_columns_per_subnet(); + let custody_ids = r.expect_only_data_columns_by_root_requests(block_root, custody_column_count); + r.complete_valid_custody_request(custody_ids, data_columns, false); + r.expect_no_active_lookups(); +} + +#[test] +fn custody_lookup_no_peers_available_initially() { + let Some(mut r) = TestRig::test_setup_after_peerdas() else { + return; + }; + // Do not add peers initially let (block, data_columns) = r.rand_block_and_data_columns(); let block_root = block.canonical_root(); let peer_id = r.new_connected_peer(); @@ -1635,6 +1688,25 @@ fn custody_lookup_happy_path() { r.expect_no_active_lookups(); } +#[test] +fn custody_lookup_reconstruction() { + let Some(mut r) = TestRig::test_setup_after_peerdas_supernode() else { + return; + }; + r.new_connected_peers_for_peerdas(); + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should not request blobs + let id = r.expect_block_lookup_request(block.canonical_root()); + r.complete_valid_block_request(id, block.into(), true); + let custody_column_count = E::data_column_subnet_count() * E::data_columns_per_subnet(); + let custody_ids = r.expect_only_data_columns_by_root_requests(block_root, custody_column_count); + r.complete_valid_custody_request(custody_ids, data_columns, false); + r.expect_no_active_lookups(); +} + // TODO(das): Test retries of DataColumnByRoot: // - Expect request for column_index // - Respond with bad data diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7b8d7850a71..7c6740dd244 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -36,7 +36,8 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; use super::network_context::{ - BlockOrBlob, CustodyId, RangeRequestId, RpcEvent, SyncNetworkContext, + BlockOrBlob, RangeRequestId, RpcByRootRequestError, RpcByRootVerifyError, RpcEvent, + SyncNetworkContext, }; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; @@ -45,12 +46,14 @@ use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProces use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ - BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, + BlobRequestState, BlockComponent, BlockRequestState, CustodyColumnRequestState, DownloadResult, + LookupRequestError, }; use crate::sync::block_sidecar_coupling::RangeBlockComponentsRequest; use crate::sync::network_context::PeerGroup; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState, @@ -65,7 +68,9 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{ + BlobSidecar, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot, +}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -106,7 +111,7 @@ pub struct DataColumnsByRootRequestId { #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Sampling(SamplingId), - Custody(CustodyId), + Custody(Id, ColumnIndex), } #[derive(Debug)] @@ -187,7 +192,7 @@ pub enum SyncMessage { pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, - SingleCustodyColumn(Id), + SingleCustodyColumn(Id, ColumnIndex), } impl BlockProcessType { @@ -195,7 +200,7 @@ impl BlockProcessType { match self { BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } - | BlockProcessType::SingleCustodyColumn(id) => *id, + | BlockProcessType::SingleCustodyColumn(id, _) => *id, } } } @@ -891,10 +896,12 @@ impl SyncManager { if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) { self.block_lookups .on_download_response::>( - id.lookup_id, + id, + 0, // component_index = 0, there's a single block resp.map(|(value, seen_timestamp)| { (value, PeerGroup::from_single(peer_id), seen_timestamp) - }), + }) + .map_err(LookupRequestError::BlockRequestError), &mut self.network, ) } @@ -965,10 +972,12 @@ impl SyncManager { if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) { self.block_lookups .on_download_response::>( - id.lookup_id, + id, + 0, // component_index = 0, there's a single blob request resp.map(|(value, seen_timestamp)| { (value, PeerGroup::from_single(peer_id), seen_timestamp) - }), + }) + .map_err(LookupRequestError::BlobRequestError), &mut self.network, ) } @@ -993,21 +1002,38 @@ impl SyncManager { self.on_sampling_result(requester, result) } } - DataColumnsByRootRequester::Custody(id) => { - if let Some((requester, custody_columns)) = - self.network.on_custody_by_root_response(id, peer_id, resp) - { - // TODO(das): get proper timestamp - let seen_timestamp = timestamp_now(); - self.block_lookups - .on_download_response::>( - requester.0.lookup_id, - custody_columns.map(|(columns, peer_group)| { - (columns, peer_group, seen_timestamp) - }), - &mut self.network, - ); - } + DataColumnsByRootRequester::Custody(lookup_id, column_index) => { + let resp = resp.and_then(|(data_columns, seen_timestamp)| { + let data_column = data_columns.into_iter().next().ok_or( + // TODO(das): block lookup must not allow a request to fail in response to these + // error + // TODO(das): failing peers should be tracked and not re-requested. We + // should hold the custody request for a bit until someone has it. + // Retrying the same peer over and over makes no sense + RpcByRootRequestError::VerifyError( + RpcByRootVerifyError::NoResponseReturned, + ), + )?; + // on_data_columns_by_root_response ensures that peers return either nothing + // or the data requested. block lookups only requests custody columns, so if + // there is one item in this request, it must be a custody column. + let custody_column = CustodyDataColumn::from_asserted_custody(data_column); + Ok((custody_column, seen_timestamp)) + }); + // TODO(das): get proper timestamp + self.block_lookups + .on_download_response::>( + SingleLookupReqId { + lookup_id, + req_id: id.req_id, + }, + column_index as usize, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }) + .map_err(LookupRequestError::CustodyRequestError), + &mut self.network, + ); } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index bdd6ca241fd..2b4a4e139dd 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,8 +1,6 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. -pub use self::custody::CustodyId; -use self::custody::{ActiveCustodyRequest, CustodyRequester, Error as CustodyRequestError}; use self::requests::{ ActiveBlobsByRootRequest, ActiveBlocksByRootRequest, ActiveDataColumnsByRootRequest, }; @@ -30,7 +28,7 @@ use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; use lighthouse_network::{ Client, Eth2Enr, NetworkGlobals, PeerAction, PeerId, ReportSource, Request, }; -pub use requests::LookupVerifyError; +pub use requests::RpcByRootVerifyError; use slog::{debug, error, trace, warn}; use slot_clock::SlotClock; use std::collections::hash_map::Entry; @@ -43,7 +41,6 @@ use types::{ SignedBeaconBlock, Slot, }; -pub mod custody; mod requests; pub struct BlocksAndBlobsByRangeResponse { @@ -71,27 +68,28 @@ pub enum RpcEvent { RPCError(RPCError), } -pub type RpcProcessingResult = Result<(T, Duration), LookupFailure>; - #[derive(Debug)] -pub enum LookupFailure { - RpcError(RPCError), - LookupVerifyError(LookupVerifyError), - CustodyRequestError(CustodyRequestError), +pub enum RpcByRootRequestError { + NetworkError(RPCError), + VerifyError(RpcByRootVerifyError), } -impl From for LookupFailure { +pub type RpcByRootRequestResult = Result<(T, Duration), RpcByRootRequestError>; + +impl From for RpcByRootRequestError { fn from(e: RPCError) -> Self { - LookupFailure::RpcError(e) + Self::NetworkError(e) } } -impl From for LookupFailure { - fn from(e: LookupVerifyError) -> Self { - LookupFailure::LookupVerifyError(e) +impl From for RpcByRootRequestError { + fn from(e: RpcByRootVerifyError) -> Self { + Self::VerifyError(e) } } +pub type ReqId = u32; + #[derive(Clone, Debug)] pub struct PeerGroup { peers: Vec, @@ -101,9 +99,6 @@ impl PeerGroup { pub fn from_single(peer: PeerId) -> Self { Self { peers: vec![peer] } } - pub fn from_set(peers: Vec) -> Self { - Self { peers } - } pub fn all(&self) -> &[PeerId] { &self.peers } @@ -126,8 +121,6 @@ pub struct SyncNetworkContext { DataColumnsByRootRequestId, ActiveDataColumnsByRootRequest, >, - /// Mapping of active custody column requests for a block root - custody_by_root_requests: FnvHashMap>, /// BlocksByRange requests paired with BlobsByRange range_block_components_requests: @@ -179,7 +172,6 @@ impl SyncNetworkContext { blocks_by_root_requests: <_>::default(), blobs_by_root_requests: <_>::default(), data_columns_by_root_requests: <_>::default(), - custody_by_root_requests: <_>::default(), range_block_components_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -411,20 +403,18 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, peer_id: PeerId, block_root: Hash256, - ) -> Result { + ) -> Result, &'static str> { if self .chain .reqresp_pre_import_cache .read() .contains_key(&block_root) { - return Ok(false); + return Ok(None); } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), - }; + let req_id = self.next_id(); + let id = SingleLookupReqId { lookup_id, req_id }; debug!( self.log, @@ -446,7 +436,7 @@ impl SyncNetworkContext { self.blocks_by_root_requests .insert(id, ActiveBlocksByRootRequest::new(request)); - Ok(true) + Ok(Some(req_id)) } /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: @@ -461,7 +451,7 @@ impl SyncNetworkContext { peer_id: PeerId, block_root: Hash256, downloaded_block_expected_blobs: Option, - ) -> Result { + ) -> Result, &'static str> { // Check if we are into deneb, and before peerdas if !self .chain @@ -475,19 +465,21 @@ impl SyncNetworkContext { .epoch(T::EthSpec::slots_per_epoch()), ) { - return Ok(false); + return Ok(None); } - let expected_blobs = downloaded_block_expected_blobs - .or_else(|| { - self.chain - .data_availability_checker - .num_expected_blobs(&block_root) - }) - .unwrap_or( - // If we don't about the block being requested, attempt to fetch all blobs - T::EthSpec::max_blobs_per_block(), - ); + // Do not download blobs until the block is downloaded (or already in the da_checker). + // Then we avoid making requests to peers for blocks that may not have data. If the + // block is not yet downloaded, do nothing. There is at least one future event to + // continue this request. + let Some(expected_blobs) = downloaded_block_expected_blobs else { + return Ok(None); + }; + + // No data required + if expected_blobs == 0 { + return Ok(None); + } let imported_blob_indexes = self .chain @@ -501,13 +493,11 @@ impl SyncNetworkContext { if indices.is_empty() { // No blobs required, do not issue any request - return Ok(false); + return Ok(None); } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), - }; + let req_id = self.next_id(); + let id = SingleLookupReqId { lookup_id, req_id }; debug!( self.log, @@ -533,7 +523,7 @@ impl SyncNetworkContext { self.blobs_by_root_requests .insert(id, ActiveBlobsByRootRequest::new(request)); - Ok(true) + Ok(Some(req_id)) } pub fn data_column_lookup_request( @@ -541,8 +531,15 @@ impl SyncNetworkContext { requester: DataColumnsByRootRequester, peer_id: PeerId, request: DataColumnsByRootSingleBlockRequest, - ) -> Result<(), &'static str> { + ) -> Result { let req_id = self.next_id(); + let id = DataColumnsByRootRequestId { requester, req_id }; + + // TODO(das): Check here if the column is already in the da_checker. Here you can prevent + // re-fetching sampling columns for columns that: + // - Part of custody and already downloaded and verified + // - Part of custody and already imported + debug!( self.log, "Sending DataColumnsByRoot Request"; @@ -550,10 +547,8 @@ impl SyncNetworkContext { "block_root" => ?request.block_root, "indices" => ?request.indices, "peer" => %peer_id, - "requester" => ?requester, - "id" => req_id, + "id" => ?id, ); - let id = DataColumnsByRootRequestId { requester, req_id }; self.send_network_msg(NetworkMessage::SendRequest { peer_id, @@ -564,101 +559,7 @@ impl SyncNetworkContext { self.data_columns_by_root_requests .insert(id, ActiveDataColumnsByRootRequest::new(request, requester)); - Ok(()) - } - - pub fn custody_lookup_request( - &mut self, - lookup_id: SingleLookupId, - block_root: Hash256, - downloaded_block_expected_data: Option, - ) -> Result { - // Check if we are into peerdas - if !self - .chain - .data_availability_checker - .data_columns_required_for_epoch( - // TODO(das): use the block's slot - self.chain - .slot_clock - .now_or_genesis() - .ok_or("clock not available")? - .epoch(T::EthSpec::slots_per_epoch()), - ) - { - return Ok(false); - } - - let expects_data = downloaded_block_expected_data - .or_else(|| { - self.chain - .data_availability_checker - .num_expected_blobs(&block_root) - }) - .map(|n| n > 0) - // If we don't know about the block being requested, assume block has data - .unwrap_or(true); - - // No data required for this block - if !expects_data { - return Ok(false); - } - - let custody_indexes_imported = self - .chain - .data_availability_checker - .imported_custody_column_indexes(&block_root) - .unwrap_or_default(); - - // TODO(das): figure out how to pass block.slot if we end up doing rotation - let block_epoch = Epoch::new(0); - let custody_indexes_duty = self.network_globals().custody_columns(block_epoch)?; - - // Include only the blob indexes not yet imported (received through gossip) - let custody_indexes_to_fetch = custody_indexes_duty - .into_iter() - .filter(|index| !custody_indexes_imported.contains(index)) - .collect::>(); - - if custody_indexes_to_fetch.is_empty() { - // No indexes required, do not issue any request - return Ok(false); - } - - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), - }; - - debug!( - self.log, - "Starting custody columns request"; - "block_root" => ?block_root, - "indices" => ?custody_indexes_to_fetch, - "id" => ?id - ); - - let requester = CustodyRequester(id); - let mut request = ActiveCustodyRequest::new( - block_root, - requester, - custody_indexes_to_fetch, - self.log.clone(), - ); - - // TODO(das): start request - // Note that you can only send, but not handle a response here - match request.continue_requests(self) { - Ok(_) => { - // Ignoring the result of `continue_requests` is okay. A request that has just been - // created cannot return data immediately, it must send some request to the network - // first. And there must exist some request, `custody_indexes_to_fetch` is not empty. - self.custody_by_root_requests.insert(requester, request); - Ok(true) - } - // TODO(das): handle this error properly - Err(_) => Err("custody_send_error"), - } + Ok(req_id) } pub fn is_execution_engine_online(&self) -> bool { @@ -801,7 +702,7 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, block: RpcEvent>>, - ) -> Option>>> { + ) -> Option>>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { return None; }; @@ -827,8 +728,8 @@ impl SyncNetworkContext { } }; - if let Err(ref e) = resp { - self.on_lookup_failure(peer_id, e); + if let Err(RpcByRootRequestError::VerifyError(e)) = &resp { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } Some(resp) } @@ -838,7 +739,7 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, blob: RpcEvent>>, - ) -> Option>> { + ) -> Option>> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { return None; }; @@ -854,23 +755,18 @@ impl SyncNetworkContext { Err(e.into()) } }, - RpcEvent::StreamTermination => { - // Stream terminator - match request.remove().terminate() { - Some(blobs) => to_fixed_blob_sidecar_list(blobs) - .map(|blobs| (blobs, timestamp_now())) - .map_err(Into::into), - None => return None, - } - } + RpcEvent::StreamTermination => match request.remove().terminate() { + Ok(_) => return None, + Err(e) => Err(e.into()), + }, RpcEvent::RPCError(e) => { request.remove(); Err(e.into()) } }; - if let Err(ref e) = resp { - self.on_lookup_failure(peer_id, e); + if let Err(RpcByRootRequestError::VerifyError(e)) = &resp { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } Some(resp) } @@ -883,7 +779,7 @@ impl SyncNetworkContext { item: RpcEvent>>, ) -> Option<( DataColumnsByRootRequester, - RpcProcessingResult>>>, + RpcByRootRequestResult>>>, )> { let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else { return None; @@ -901,74 +797,22 @@ impl SyncNetworkContext { Err(e.into()) } }, - RpcEvent::StreamTermination => { - // Stream terminator - match request.remove().terminate() { - Some(items) => Ok((items, timestamp_now())), - None => return None, - } - } + RpcEvent::StreamTermination => match request.remove().terminate() { + Some(items) => Ok((items, timestamp_now())), + None => return None, + }, RpcEvent::RPCError(e) => { request.remove(); Err(e.into()) } }; - if let Err(ref e) = resp { - self.on_lookup_failure(peer_id, e); + if let Err(RpcByRootRequestError::VerifyError(e)) = &resp { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } Some((requester, resp)) } - /// Insert a downloaded column into an active custody request. Then make progress on the - /// entire request. - /// - /// ### Returns - /// - /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. - /// - `None`: Request still active, requester should do no action - #[allow(clippy::type_complexity)] - pub fn on_custody_by_root_response( - &mut self, - id: CustodyId, - peer_id: PeerId, - resp: RpcProcessingResult>>>, - ) -> Option<( - CustodyRequester, - Result<(Vec>, PeerGroup), LookupFailure>, - )> { - // Note: need to remove the request to borrow self again below. Otherwise we can't - // do nested requests - let Some(mut request) = self.custody_by_root_requests.remove(&id.id) else { - // TOOD(das): This log can happen if the request is error'ed early and dropped - debug!(self.log, "Custody column downloaded event for unknown request"; "id" => ?id); - return None; - }; - - let result = request - .on_data_column_downloaded(peer_id, id.column_index, resp, self) - .map_err(LookupFailure::CustodyRequestError) - .transpose(); - - // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to - // an Option first to use in an `if let Some() { act on result }` block. - if let Some(result) = result { - match result.as_ref() { - Ok((columns, peer_group)) => { - debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group) - } - Err(e) => { - debug!(self.log, "Custody request failure, removing"; "id" => ?id, "error" => ?e) - } - } - - Some((id.id, result)) - } else { - self.custody_by_root_requests.insert(id.id, request); - None - } - } - pub fn send_block_for_processing( &self, block_root: Hash256, @@ -1064,31 +908,17 @@ impl SyncNetworkContext { } } } - - /// Downscore peers for lookup errors that originate from sync - pub fn on_lookup_failure(&self, peer_id: PeerId, err: &LookupFailure) { - match err { - // RPCErros are downscored in the network handler - LookupFailure::RpcError(_) => {} - // Only downscore lookup verify errors. RPC errors are downscored in the network handler. - LookupFailure::LookupVerifyError(e) => { - self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } - // CustodyRequestError are downscored in the each data_columns_by_root request - LookupFailure::CustodyRequestError(_) => {} - } - } } fn to_fixed_blob_sidecar_list( blobs: Vec>>, -) -> Result, LookupVerifyError> { +) -> Result, RpcByRootVerifyError> { let mut fixed_list = FixedBlobSidecarList::default(); for blob in blobs.into_iter() { let index = blob.index as usize; *fixed_list .get_mut(index) - .ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob) + .ok_or(RpcByRootVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob) } Ok(fixed_list) } diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs deleted file mode 100644 index f20a95415db..00000000000 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ /dev/null @@ -1,286 +0,0 @@ -use crate::sync::manager::SingleLookupReqId; - -use self::request::ActiveColumnSampleRequest; -use beacon_chain::data_column_verification::CustodyDataColumn; -use beacon_chain::BeaconChainTypes; -use fnv::FnvHashMap; -use lighthouse_network::PeerId; -use slog::{debug, warn}; -use std::{marker::PhantomData, sync::Arc}; -use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Epoch, Hash256}; - -use super::{PeerGroup, RpcProcessingResult, SyncNetworkContext}; - -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct CustodyId { - pub id: CustodyRequester, - pub column_index: ColumnIndex, -} - -/// Downstream components that perform custody by root requests. -/// Currently, it's only single block lookups, so not using an enum -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct CustodyRequester(pub SingleLookupReqId); - -type DataColumnSidecarList = Vec>>; - -pub struct ActiveCustodyRequest { - block_root: Hash256, - block_epoch: Epoch, - requester_id: CustodyRequester, - column_requests: FnvHashMap, - columns: Vec>, - /// Logger for the `SyncNetworkContext`. - pub log: slog::Logger, - _phantom: PhantomData, -} - -#[derive(Debug)] -pub enum Error { - SendFailed(&'static str), - TooManyFailures, - BadState(String), - NoPeers(ColumnIndex), -} - -type CustodyRequestResult = Result>, PeerGroup)>, Error>; - -impl ActiveCustodyRequest { - pub(crate) fn new( - block_root: Hash256, - requester_id: CustodyRequester, - column_indexes: Vec, - log: slog::Logger, - ) -> Self { - Self { - block_root, - // TODO(das): use actual epoch if there's rotation - block_epoch: Epoch::new(0), - requester_id, - column_requests: column_indexes - .into_iter() - .map(|index| (index, ActiveColumnSampleRequest::new(index))) - .collect(), - columns: vec![], - log, - _phantom: PhantomData, - } - } - - /// Insert a downloaded column into an active sampling request. Then make progress on the - /// entire request. - /// - /// ### Returns - /// - /// - `Err`: Sampling request has failed and will be dropped - /// - `Ok(Some)`: Sampling request has successfully completed and will be dropped - /// - `Ok(None)`: Sampling request still active - pub(crate) fn on_data_column_downloaded( - &mut self, - _peer_id: PeerId, - column_index: ColumnIndex, - resp: RpcProcessingResult>, - cx: &mut SyncNetworkContext, - ) -> CustodyRequestResult { - // TODO(das): Should downscore peers for verify errors here - - let Some(request) = self.column_requests.get_mut(&column_index) else { - warn!( - self.log, - "Received sampling response for unrequested column index" - ); - return Ok(None); - }; - - match resp { - Ok((mut data_columns, _seen_timestamp)) => { - debug!(self.log, "Sample download success"; "block_root" => %self.block_root, "column_index" => column_index, "count" => data_columns.len()); - - // No need to check data_columns has len > 1, as the SyncNetworkContext ensure that - // only requested is returned (or none); - if let Some(data_column) = data_columns.pop() { - request.on_download_success()?; - - // If on_download_success is successful, we are expecting a columna for this - // custody requirement. - self.columns - .push(CustodyDataColumn::from_asserted_custody(data_column)); - } else { - // Peer does not have the requested data. - // TODO(das) what to do? - // TODO(das): If the peer is in the lookup peer set it claims to have imported - // the block AND its custody columns. So in this case we can downscore - debug!(self.log, "Sampling peer claims to not have the data"; "block_root" => %self.block_root, "column_index" => column_index); - // TODO(das) tolerate this failure if you are not sure the block has data - request.on_download_success()?; - } - } - Err(err) => { - debug!(self.log, "Sample download error"; "block_root" => %self.block_root, "column_index" => column_index, "error" => ?err); - - // Error downloading, maybe penalize peer and retry again. - // TODO(das) with different peer or different peer? - request.on_download_error()?; - } - }; - - self.continue_requests(cx) - } - - pub(crate) fn continue_requests( - &mut self, - cx: &mut SyncNetworkContext, - ) -> CustodyRequestResult { - // First check if sampling is completed, by computing `required_successes` - let mut successes = 0; - - for request in self.column_requests.values() { - if request.is_downloaded() { - successes += 1; - } - } - - // All requests have completed successfully. We may not have all the expected columns if the - // serving peers claim that this block has no data. - if successes == self.column_requests.len() { - let columns = std::mem::take(&mut self.columns); - - let peers = self - .column_requests - .values() - .filter_map(|r| r.peer()) - .collect::>(); - let peer_group = PeerGroup::from_set(peers); - - return Ok(Some((columns, peer_group))); - } - - for (_, request) in self.column_requests.iter_mut() { - request.request(self.block_root, self.block_epoch, self.requester_id, cx)?; - } - - Ok(None) - } -} - -mod request { - use super::{CustodyId, CustodyRequester, Error}; - use crate::sync::{ - manager::DataColumnsByRootRequester, - network_context::{DataColumnsByRootSingleBlockRequest, SyncNetworkContext}, - }; - use beacon_chain::BeaconChainTypes; - use lighthouse_network::PeerId; - use types::{data_column_sidecar::ColumnIndex, Epoch, Hash256}; - - /// TODO(das): this attempt count is nested into the existing lookup request count. - const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS: usize = 3; - - pub(crate) struct ActiveColumnSampleRequest { - column_index: ColumnIndex, - status: Status, - download_failures: usize, - } - - #[derive(Debug, Clone)] - enum Status { - NotStarted, - Downloading(PeerId), - Downloaded(PeerId), - } - - impl ActiveColumnSampleRequest { - pub(crate) fn new(column_index: ColumnIndex) -> Self { - Self { - column_index, - status: Status::NotStarted, - download_failures: 0, - } - } - - pub(crate) fn is_downloaded(&self) -> bool { - match self.status { - Status::NotStarted | Status::Downloading(_) => false, - Status::Downloaded(_) => true, - } - } - - pub(crate) fn peer(&self) -> Option { - match self.status { - Status::NotStarted | Status::Downloading(_) => None, - Status::Downloaded(peer) => Some(peer), - } - } - - pub(crate) fn request( - &mut self, - block_root: Hash256, - block_epoch: Epoch, - requester: CustodyRequester, - cx: &mut SyncNetworkContext, - ) -> Result { - match &self.status { - Status::NotStarted => {} // Ok to continue - Status::Downloading(_) => return Ok(false), // Already downloading - Status::Downloaded(_) => return Ok(false), // Already completed - } - - if self.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS { - return Err(Error::TooManyFailures); - } - - // TODO: When is a fork and only a subset of your peers know about a block, sampling should only - // be queried on the peers on that fork. Should this case be handled? How to handle it? - let peer_ids = cx.get_custodial_peers(block_epoch, self.column_index); - - // TODO(das) randomize custodial peer and avoid failing peers - let Some(peer_id) = peer_ids.first().cloned() else { - // Do not tolerate not having custody peers, hard error. - // TODO(das): we might implement some grace period. The request will pause for X - // seconds expecting the peer manager to find peers before failing the request. - return Err(Error::NoPeers(self.column_index)); - }; - - cx.data_column_lookup_request( - DataColumnsByRootRequester::Custody(CustodyId { - id: requester, - column_index: self.column_index, - }), - peer_id, - DataColumnsByRootSingleBlockRequest { - block_root, - indices: vec![self.column_index], - }, - ) - .map_err(Error::SendFailed)?; - - self.status = Status::Downloading(peer_id); - Ok(true) - } - - pub(crate) fn on_download_error(&mut self) -> Result { - match self.status.clone() { - Status::Downloading(peer_id) => { - self.download_failures += 1; - self.status = Status::NotStarted; - Ok(peer_id) - } - other => Err(Error::BadState(format!( - "bad state on_sampling_error expected Sampling got {other:?}" - ))), - } - } - - pub(crate) fn on_download_success(&mut self) -> Result<(), Error> { - match &self.status { - Status::Downloading(peer) => { - self.status = Status::Downloaded(*peer); - Ok(()) - } - other => Err(Error::BadState(format!( - "bad state on_sampling_success expected Sampling got {other:?}" - ))), - } - } - } -} diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 2fc239b49b2..722a8cea09c 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,7 +1,7 @@ use beacon_chain::get_block_root; use lighthouse_network::rpc::{ methods::{BlobsByRootRequest, DataColumnsByRootRequest}, - BlocksByRootRequest, RPCError, + BlocksByRootRequest, }; use std::sync::Arc; use strum::IntoStaticStr; @@ -11,12 +11,22 @@ use types::{ }; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum LookupVerifyError { +pub enum RpcByRootVerifyError { + /// On a request that expects strictly one item, we receive the stream termination before + /// that item NoResponseReturned, + /// On a request for a strict number of items, we receive the stream termination before the + /// expected count of items + NotEnoughResponsesReturned, + /// Received more items than expected TooManyResponses, + /// Received an item that corresponds to a different request block root UnrequestedBlockRoot(Hash256), + /// Received a blob / column with an index that is not in the requested set UnrequestedBlobIndex(u64), + /// Blob or column inclusion proof does not match its own header InvalidInclusionProof, + /// Received more than one item for the tuple (block_root, index) DuplicateData, } @@ -39,14 +49,14 @@ impl ActiveBlocksByRootRequest { pub fn add_response( &mut self, block: Arc>, - ) -> Result>, LookupVerifyError> { + ) -> Result>, RpcByRootVerifyError> { if self.resolved { - return Err(LookupVerifyError::TooManyResponses); + return Err(RpcByRootVerifyError::TooManyResponses); } let block_root = get_block_root(&block); if self.request.0 != block_root { - return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + return Err(RpcByRootVerifyError::UnrequestedBlockRoot(block_root)); } // Valid data, blocks by root expects a single response @@ -54,11 +64,11 @@ impl ActiveBlocksByRootRequest { Ok(block) } - pub fn terminate(self) -> Result<(), LookupVerifyError> { + pub fn terminate(self) -> Result<(), RpcByRootVerifyError> { if self.resolved { Ok(()) } else { - Err(LookupVerifyError::NoResponseReturned) + Err(RpcByRootVerifyError::NoResponseReturned) } } } @@ -114,23 +124,23 @@ impl ActiveBlobsByRootRequest { pub fn add_response( &mut self, blob: Arc>, - ) -> Result>>>, LookupVerifyError> { + ) -> Result>>>, RpcByRootVerifyError> { if self.resolved { - return Err(LookupVerifyError::TooManyResponses); + return Err(RpcByRootVerifyError::TooManyResponses); } let block_root = blob.block_root(); if self.request.block_root != block_root { - return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + return Err(RpcByRootVerifyError::UnrequestedBlockRoot(block_root)); } if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - return Err(LookupVerifyError::InvalidInclusionProof); + return Err(RpcByRootVerifyError::InvalidInclusionProof); } if !self.request.indices.contains(&blob.index) { - return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index)); + return Err(RpcByRootVerifyError::UnrequestedBlobIndex(blob.index)); } if self.blobs.iter().any(|b| b.index == blob.index) { - return Err(LookupVerifyError::DuplicateData); + return Err(RpcByRootVerifyError::DuplicateData); } self.blobs.push(blob); @@ -143,11 +153,13 @@ impl ActiveBlobsByRootRequest { } } - pub fn terminate(self) -> Option>>> { + /// Handle a stream termination. Expects sender to strictly send the requested number of items + pub fn terminate(self) -> Result<(), RpcByRootVerifyError> { if self.resolved { - None + Ok(()) } else { - Some(self.blobs) + // Expect to receive the stream termination AFTER the expect number of items + Err(RpcByRootVerifyError::NotEnoughResponsesReturned) } } } @@ -196,28 +208,25 @@ impl ActiveDataColumnsByRootRequest { pub fn add_response( &mut self, data_column: Arc>, - ) -> Result>>>, RPCError> { + ) -> Result>>>, RpcByRootVerifyError> { if self.resolved { - return Err(RPCError::InvalidData("too many responses".to_string())); + return Err(RpcByRootVerifyError::TooManyResponses); } let block_root = data_column.block_root(); if self.request.block_root != block_root { - return Err(RPCError::InvalidData(format!( - "un-requested block root {block_root:?}" - ))); + return Err(RpcByRootVerifyError::UnrequestedBlockRoot(block_root)); } if !data_column.verify_inclusion_proof().unwrap_or(false) { - return Err(RPCError::InvalidData("invalid inclusion proof".to_string())); + return Err(RpcByRootVerifyError::InvalidInclusionProof); } if !self.request.indices.contains(&data_column.index) { - return Err(RPCError::InvalidData(format!( - "un-requested index {}", - data_column.index - ))); + return Err(RpcByRootVerifyError::UnrequestedBlobIndex( + data_column.index, + )); } if self.items.iter().any(|b| b.index == data_column.index) { - return Err(RPCError::InvalidData("duplicated data".to_string())); + return Err(RpcByRootVerifyError::DuplicateData); } self.items.push(data_column); @@ -230,6 +239,7 @@ impl ActiveDataColumnsByRootRequest { } } + /// Handle stream termination. Allows the sender to return less items than requested. pub fn terminate(self) -> Option>>> { if self.resolved { None diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs index 84ebe69928a..6311f99f474 100644 --- a/beacon_node/network/src/sync/sampling.rs +++ b/beacon_node/network/src/sync/sampling.rs @@ -1,5 +1,5 @@ -use self::request::ActiveColumnSampleRequest; -use super::network_context::{LookupFailure, SyncNetworkContext}; +use self::request::{ActiveColumnSampleRequest, SamplingErrorReason}; +use super::network_context::{RpcByRootRequestError, SyncNetworkContext}; use crate::metrics; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; @@ -101,7 +101,7 @@ impl Sampling { &mut self, id: SamplingId, peer_id: PeerId, - resp: Result<(DataColumnSidecarList, Duration), LookupFailure>, + resp: Result<(DataColumnSidecarList, Duration), RpcByRootRequestError>, cx: &mut SyncNetworkContext, ) -> Option<(SamplingRequester, SamplingResult)> { let Some(request) = self.requests.get_mut(&id.id) else { @@ -235,7 +235,7 @@ impl ActiveSamplingRequest { &mut self, _peer_id: PeerId, column_index: ColumnIndex, - resp: Result<(DataColumnSidecarList, Duration), LookupFailure>, + resp: Result<(DataColumnSidecarList, Duration), RpcByRootRequestError>, cx: &mut SyncNetworkContext, ) -> Result, SamplingError> { // Select columns to sample @@ -284,7 +284,7 @@ impl ActiveSamplingRequest { // Peer does not have the requested data. // TODO(das) what to do? debug!(self.log, "Sampling peer claims to not have the data"; "block_root" => %self.block_root, "column_index" => column_index); - request.on_sampling_error()?; + request.on_sampling_error(SamplingErrorReason::DontHave)?; } } Err(err) => { @@ -293,7 +293,7 @@ impl ActiveSamplingRequest { // Error downloading, maybe penalize peer and retry again. // TODO(das) with different peer or different peer? - request.on_sampling_error()?; + request.on_sampling_error(SamplingErrorReason::DownloadError)?; } }; @@ -341,7 +341,7 @@ impl ActiveSamplingRequest { // TODO(das): Peer sent invalid data, penalize and try again from different peer // TODO(das): Count individual failures - let peer_id = request.on_sampling_error()?; + let peer_id = request.on_sampling_error(SamplingErrorReason::Invalid)?; cx.report_peer( peer_id, PeerAction::LowToleranceError, @@ -427,12 +427,20 @@ mod request { use std::collections::HashSet; use types::{data_column_sidecar::ColumnIndex, EthSpec, Hash256, Slot}; + pub(crate) enum SamplingErrorReason { + DownloadError, + DontHave, + Invalid, + } + pub(crate) struct ActiveColumnSampleRequest { column_index: ColumnIndex, status: Status, // TODO(das): Should downscore peers that claim to not have the sample? - #[allow(dead_code)] + /// Set of peers who claim to no have this column peers_dont_have: HashSet, + /// Set of peers that have sent us a sample with an invalid proof + peers_sent_invalid: HashSet, } #[derive(Debug, Clone)] @@ -449,6 +457,7 @@ mod request { column_index, status: Status::NotStarted, peers_dont_have: <_>::default(), + peers_sent_invalid: <_>::default(), } } @@ -488,13 +497,30 @@ mod request { // TODO: When is a fork and only a subset of your peers know about a block, sampling should only // be queried on the peers on that fork. Should this case be handled? How to handle it? - let peer_ids = cx.get_custodial_peers( + let custodial_peer_ids = cx.get_custodial_peers( block_slot.epoch(::slots_per_epoch()), self.column_index, ); - // TODO(das) randomize custodial peer and avoid failing peers - if let Some(peer_id) = peer_ids.first().cloned() { + // TODO(das) randomize peer selection + let selected_peer_id: Option = { + // Filter out peers that have sent us invalid data, never request from then again + let peer_ids = custodial_peer_ids + .iter() + .filter(|peer| self.peers_sent_invalid.contains(peer)) + .collect::>(); + // Check if there are peers we haven't requested from yet + if let Some(peer) = peer_ids + .iter() + .find(|peer| !self.peers_dont_have.contains(peer)) + { + Some(**peer) + } else { + peer_ids.first().copied().copied() + } + }; + + if let Some(peer_id) = selected_peer_id { cx.data_column_lookup_request( DataColumnsByRootRequester::Sampling(SamplingId { id: requester, @@ -516,10 +542,24 @@ mod request { } } - pub(crate) fn on_sampling_error(&mut self) -> Result { + pub(crate) fn on_sampling_error( + &mut self, + reason: SamplingErrorReason, + ) -> Result { match self.status.clone() { Status::Sampling(peer_id) => { self.status = Status::NotStarted; + match reason { + SamplingErrorReason::DownloadError => { + // Allow download errors, don't track + } + SamplingErrorReason::DontHave => { + self.peers_dont_have.insert(peer_id); + } + SamplingErrorReason::Invalid => { + self.peers_sent_invalid.insert(peer_id); + } + } Ok(peer_id) } other => Err(SamplingError::BadState(format!(