From 8e80a6a92e3f10e5bdcfaf1af96aa2bbcdcced74 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 11 May 2024 11:36:38 +0400 Subject: [PATCH 1/5] Improve lookup errors & trigger sampling for sync & better peer selection for sampling --- beacon_node/beacon_chain/src/beacon_chain.rs | 26 +++++-- .../beacon_chain/src/block_verification.rs | 4 ++ .../gossip_methods.rs | 12 ++-- .../network_beacon_processor/sync_methods.rs | 29 ++++++-- .../network/src/sync/block_lookups/common.rs | 30 ++++---- .../network/src/sync/block_lookups/mod.rs | 28 +++++--- .../sync/block_lookups/single_block_lookup.rs | 49 ++++++------- .../network/src/sync/block_lookups/tests.rs | 2 +- beacon_node/network/src/sync/manager.rs | 15 ++-- .../network/src/sync/network_context.rs | 72 ++++++++----------- .../src/sync/network_context/custody.rs | 31 ++++---- .../src/sync/network_context/requests.rs | 45 ++++++------ beacon_node/network/src/sync/sampling.rs | 64 +++++++++++++---- 13 files changed, 235 insertions(+), 172 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0a6a4666d13..229726d1b14 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -219,11 +219,13 @@ impl TryInto for AvailabilityProcessingStatus { /// The result of a chain segment processing. pub enum ChainSegmentResult { /// Processing this chain segment finished successfully. - Successful { imported_blocks: usize }, + Successful { + imported_blocks: Vec<(Hash256, Slot)>, + }, /// There was an error processing this chain segment. Before the error, some blocks could /// have been imported. Failed { - imported_blocks: usize, + imported_blocks: Vec<(Hash256, Slot)>, error: BlockError, }, } @@ -2709,7 +2711,7 @@ impl BeaconChain { chain_segment: Vec>, ) -> Result>, ChainSegmentResult> { // This function will never import any blocks. - let imported_blocks = 0; + let imported_blocks = vec![]; let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len()); // Produce a list of the parent root and slot of the child of each block. @@ -2815,7 +2817,7 @@ impl BeaconChain { chain_segment: Vec>, notify_execution_layer: NotifyExecutionLayer, ) -> ChainSegmentResult { - let mut imported_blocks = 0; + let mut imported_blocks = vec![]; // Filter uninteresting blocks from the chain segment in a blocking task. let chain = self.clone(); @@ -2875,6 +2877,8 @@ impl BeaconChain { // Import the blocks into the chain. for signature_verified_block in signature_verified_blocks { + let block_slot = signature_verified_block.slot(); + match self .process_block( signature_verified_block.block_root(), @@ -2886,9 +2890,9 @@ impl BeaconChain { { Ok(status) => { match status { - AvailabilityProcessingStatus::Imported(_) => { + AvailabilityProcessingStatus::Imported(block_root) => { // The block was imported successfully. - imported_blocks += 1; + imported_blocks.push((block_root, block_slot)); } AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { warn!(self.log, "Blobs missing in response to range request"; @@ -6823,6 +6827,16 @@ impl BeaconChain { self.data_availability_checker.data_availability_boundary() } + /// Returns true if we should issue a sampling request for this block + /// TODO(das): check if the block is still within the da_window + pub fn should_sample_slot(&self, slot: Slot) -> bool { + self.spec + .eip7594_fork_epoch + .map_or(false, |eip7594_fork_epoch| { + slot.epoch(T::EthSpec::slots_per_epoch()) >= eip7594_fork_epoch + }) + } + pub fn logger(&self) -> &Logger { &self.log } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 0a49e107042..009609621ef 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1225,6 +1225,10 @@ impl SignatureVerifiedBlock { pub fn block_root(&self) -> Hash256 { self.block_root } + + pub fn slot(&self) -> Slot { + self.block.slot() + } } impl IntoExecutionPendingBlock for SignatureVerifiedBlock { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 9259e6826b1..da14b1b5e31 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1278,20 +1278,16 @@ impl NetworkBeaconProcessor { let block = verified_block.block.block_cloned(); let block_root = verified_block.block_root; + // TODO(das) Might be too early to issue a request here. We haven't checked that the block + // actually includes blob transactions and thus has data. A peer could send a block is + // garbage commitments, and make us trigger sampling for a block that does not have data. if block.num_expected_blobs() > 0 { // Trigger sampling for block not yet execution valid. At this point column custodials are // unlikely to have received their columns. Triggering sampling so early is only viable with // either: // - Sync delaying sampling until some latter window // - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569 - if self - .chain - .spec - .eip7594_fork_epoch - .map_or(false, |eip7594_fork_epoch| { - block.epoch() >= eip7594_fork_epoch - }) - { + if self.chain.should_sample_slot(block.slot()) { self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot())); } } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 2aa498da857..eaba4933f02 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -140,6 +140,7 @@ impl NetworkBeaconProcessor { }; let slot = block.slot(); + let block_has_data = block.as_block().num_expected_blobs() > 0; let parent_root = block.message().parent_root(); let commitments_formatted = block.as_block().commitments_formatted(); @@ -160,6 +161,17 @@ impl NetworkBeaconProcessor { metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); + // RPC block imported or execution validated. If the block was already imported by gossip we + // receive Err(BlockError::AlreadyKnown). + if result.is_ok() && + // Block has at least one blob, so it produced columns + block_has_data && + // Block slot is within the DA boundary (should always be the case) and PeerDAS is activated + self.chain.should_sample_slot(slot) + { + self.send_sync_message(SyncMessage::SampleBlock(block_root, slot)); + } + // RPC block imported, regardless of process type if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result { info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); @@ -491,10 +503,19 @@ impl NetworkBeaconProcessor { { ChainSegmentResult::Successful { imported_blocks } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); - if imported_blocks > 0 { + if !imported_blocks.is_empty() { self.chain.recompute_head_at_current_slot().await; + + for (block_root, block_slot) in &imported_blocks { + if self.chain.should_sample_slot(*block_slot) { + self.send_sync_message(SyncMessage::SampleBlock( + *block_root, + *block_slot, + )); + } + } } - (imported_blocks, Ok(())) + (imported_blocks.len(), Ok(())) } ChainSegmentResult::Failed { imported_blocks, @@ -502,10 +523,10 @@ impl NetworkBeaconProcessor { } => { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL); let r = self.handle_failed_chain_segment(error); - if imported_blocks > 0 { + if !imported_blocks.is_empty() { self.chain.recompute_head_at_current_slot().await; } - (imported_blocks, r) + (imported_blocks.len(), r) } } } diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 5491bd7120f..0535dde7a04 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -1,5 +1,5 @@ use crate::sync::block_lookups::single_block_lookup::{ - LookupRequestError, SingleBlockLookup, SingleLookupRequestState, + LookupError, SingleBlockLookup, SingleLookupRequestState, }; use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId}; use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE}; @@ -49,7 +49,7 @@ pub trait RequestState { peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result; + ) -> Result; /* Response handling methods */ @@ -58,7 +58,7 @@ pub trait RequestState { id: Id, result: DownloadResult, cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError>; + ) -> Result<(), LookupError>; /* Utility methods */ @@ -84,16 +84,16 @@ impl RequestState for BlockRequestState { peer_id: PeerId, _: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { cx.block_lookup_request(id, peer_id, self.requested_block_root) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn send_for_processing( id: SingleLookupId, download_result: DownloadResult, cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result<(), LookupError> { let DownloadResult { value, block_root, @@ -106,7 +106,7 @@ impl RequestState for BlockRequestState { seen_timestamp, BlockProcessType::SingleBlock { id }, ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn response_type() -> ResponseType { @@ -132,21 +132,21 @@ impl RequestState for BlobRequestState { peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { cx.blob_lookup_request( id, peer_id, self.block_root, downloaded_block_expected_blobs, ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn send_for_processing( id: Id, download_result: DownloadResult, cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result<(), LookupError> { let DownloadResult { value, block_root, @@ -159,7 +159,7 @@ impl RequestState for BlobRequestState { seen_timestamp, BlockProcessType::SingleBlob { id }, ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn response_type() -> ResponseType { @@ -186,16 +186,16 @@ impl RequestState for CustodyRequestState { _peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { cx.custody_lookup_request(id, self.block_root, downloaded_block_expected_blobs) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn send_for_processing( id: Id, download_result: DownloadResult, cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result<(), LookupError> { let DownloadResult { value, block_root, @@ -208,7 +208,7 @@ impl RequestState for CustodyRequestState { seen_timestamp, BlockProcessType::SingleCustodyColumn(id), ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupError::SendFailed) } fn response_type() -> ResponseType { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 1b6f449ec37..7948913f168 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,12 +1,13 @@ use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; -use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; +use self::single_block_lookup::{LookupError, LookupResult, SingleBlockLookup}; use super::manager::{BlockProcessType, BlockProcessingResult}; -use super::network_context::{LookupFailure, PeerGroup, SyncNetworkContext}; +use super::network_context::{PeerGroup, RpcByRootRequestError, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use crate::sync::manager::Id; +use crate::sync::network_context::custody::CustodyRequestError; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; @@ -31,6 +32,13 @@ mod tests; const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4; +#[derive(Debug)] +pub enum LookupRequestError { + BlockRequestError(RpcByRootRequestError), + BlobRequestError(RpcByRootRequestError), + CustodyRequestError(CustodyRequestError), +} + pub enum BlockComponent { Block(DownloadResult>>), Blob(DownloadResult>>), @@ -305,7 +313,7 @@ impl BlockLookups { pub fn on_download_response>( &mut self, id: SingleLookupId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupFailure>, + response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupRequestError>, cx: &mut SyncNetworkContext, ) { let result = self.on_download_response_inner::(id, response, cx); @@ -316,9 +324,9 @@ impl BlockLookups { pub fn on_download_response_inner>( &mut self, id: SingleLookupId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupFailure>, + response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupRequestError>, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { // Note: do not downscore peers here for requests errors, SyncNetworkContext does it. let response_type = R::response_type(); @@ -326,7 +334,7 @@ impl BlockLookups { // We don't have the ability to cancel in-flight RPC requests. So this can happen // if we started this RPC request, and later saw the block/blobs via gossip. debug!(self.log, "Block returned for single block lookup not present"; "id" => id); - return Err(LookupRequestError::UnknownLookup); + return Err(LookupError::UnknownLookup); }; let block_root = lookup.block_root(); @@ -412,10 +420,10 @@ impl BlockLookups { lookup_id: SingleLookupId, result: BlockProcessingResult, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else { debug!(self.log, "Unknown single block lookup"; "id" => lookup_id); - return Err(LookupRequestError::UnknownLookup); + return Err(LookupError::UnknownLookup); }; let block_root = lookup.block_root(); @@ -557,7 +565,7 @@ impl BlockLookups { } Action::Drop => { // Drop with noop - Err(LookupRequestError::Failed) + Err(LookupError::Failed) } Action::Continue => { // Drop this completed lookup only @@ -608,7 +616,7 @@ impl BlockLookups { fn on_lookup_result( &mut self, id: SingleLookupId, - result: Result, + result: Result, source: &str, cx: &mut SyncNetworkContext, ) { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 9020aea52e8..a7c379c3b57 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -25,7 +25,7 @@ pub enum LookupResult { } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum LookupRequestError { +pub enum LookupError { /// Too many failed attempts TooManyAttempts { /// The failed attempts were primarily due to processing failures. @@ -125,7 +125,7 @@ impl SingleBlockLookup { pub fn continue_requests( &mut self, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { // TODO: Check what's necessary to download, specially for blobs self.continue_request::>(cx)?; self.continue_request::>(cx)?; @@ -148,7 +148,7 @@ impl SingleBlockLookup { fn continue_request>( &mut self, cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { + ) -> Result<(), LookupError> { let id = self.id; let awaiting_parent = self.awaiting_parent.is_some(); let block_is_processed = self.block_request_state.state.is_processed(); @@ -161,16 +161,14 @@ impl SingleBlockLookup { .state .peek_downloaded_data() .map(|block| block.num_expected_blobs()); - let peer_id = self - .get_rand_available_peer() - .ok_or(LookupRequestError::NoPeers)?; + let peer_id = self.get_rand_available_peer().ok_or(LookupError::NoPeers)?; let request = R::request_state_mut(self); // Verify the current request has not exceeded the maximum number of attempts. let request_state = request.get_state(); if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { let cannot_process = request_state.more_failed_processing_attempts(); - return Err(LookupRequestError::TooManyAttempts { cannot_process }); + return Err(LookupError::TooManyAttempts { cannot_process }); } // make_request returns true only if a request needs to be made @@ -350,13 +348,13 @@ impl SingleLookupRequestState { } /// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None. - pub fn on_download_start(&mut self) -> Result<(), LookupRequestError> { + pub fn on_download_start(&mut self) -> Result<(), LookupError> { match &self.state { State::AwaitingDownload => { self.state = State::Downloading; Ok(()) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_download_start expected AwaitingDownload got {other}" ))), } @@ -364,29 +362,26 @@ impl SingleLookupRequestState { /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong /// block. - pub fn on_download_failure(&mut self) -> Result<(), LookupRequestError> { + pub fn on_download_failure(&mut self) -> Result<(), LookupError> { match &self.state { State::Downloading => { self.failed_downloading = self.failed_downloading.saturating_add(1); self.state = State::AwaitingDownload; Ok(()) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_download_failure expected Downloading got {other}" ))), } } - pub fn on_download_success( - &mut self, - result: DownloadResult, - ) -> Result<(), LookupRequestError> { + pub fn on_download_success(&mut self, result: DownloadResult) -> Result<(), LookupError> { match &self.state { State::Downloading => { self.state = State::AwaitingProcess(result); Ok(()) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_download_success expected Downloading got {other}" ))), } @@ -407,20 +402,20 @@ impl SingleLookupRequestState { /// Revert into `AwaitingProcessing`, if the payload if not invalid and can be submitted for /// processing latter. - pub fn revert_to_awaiting_processing(&mut self) -> Result<(), LookupRequestError> { + pub fn revert_to_awaiting_processing(&mut self) -> Result<(), LookupError> { match &self.state { State::Processing(result) => { self.state = State::AwaitingProcess(result.clone()); Ok(()) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on revert_to_awaiting_processing expected Processing got {other}" ))), } } /// Registers a failure in processing a block. - pub fn on_processing_failure(&mut self) -> Result { + pub fn on_processing_failure(&mut self) -> Result { match &self.state { State::Processing(result) => { let peers_source = result.peer_group.clone(); @@ -428,28 +423,26 @@ impl SingleLookupRequestState { self.state = State::AwaitingDownload; Ok(peers_source) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_processing_failure expected Processing got {other}" ))), } } - pub fn on_processing_success(&mut self) -> Result { + pub fn on_processing_success(&mut self) -> Result { match &self.state { State::Processing(result) => { let peer_group = result.peer_group.clone(); self.state = State::Processed(Some(peer_group.clone())); Ok(peer_group) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_processing_success expected Processing got {other}" ))), } } - pub fn on_post_process_validation_failure( - &mut self, - ) -> Result, LookupRequestError> { + pub fn on_post_process_validation_failure(&mut self) -> Result, LookupError> { match &self.state { State::Processed(peer_group) => { let peer_group = peer_group.clone(); @@ -457,20 +450,20 @@ impl SingleLookupRequestState { self.state = State::AwaitingDownload; Ok(peer_group) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_post_process_validation_failure expected Processed got {other}" ))), } } /// Mark a request as complete without any download or processing - pub fn on_completed_request(&mut self) -> Result<(), LookupRequestError> { + pub fn on_completed_request(&mut self) -> Result<(), LookupError> { match &self.state { State::AwaitingDownload => { self.state = State::Processed(None); Ok(()) } - other => Err(LookupRequestError::BadState(format!( + other => Err(LookupError::BadState(format!( "Bad state on_completed_request expected AwaitingDownload got {other}" ))), } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 63f00138d67..322762da1bd 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -611,7 +611,7 @@ impl TestRig { let lookup_id = if let DataColumnsByRootRequester::Custody(id) = sampling_ids.first().unwrap().0.requester { - id.id.0.lookup_id + id.requester.0.lookup_id } else { panic!("not a custody requester") }; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7b8d7850a71..33a244d6443 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -46,6 +46,7 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, + LookupRequestError, }; use crate::sync::block_sidecar_coupling::RangeBlockComponentsRequest; use crate::sync::network_context::PeerGroup; @@ -894,7 +895,8 @@ impl SyncManager { id.lookup_id, resp.map(|(value, seen_timestamp)| { (value, PeerGroup::from_single(peer_id), seen_timestamp) - }), + }) + .map_err(LookupRequestError::BlockRequestError), &mut self.network, ) } @@ -968,7 +970,8 @@ impl SyncManager { id.lookup_id, resp.map(|(value, seen_timestamp)| { (value, PeerGroup::from_single(peer_id), seen_timestamp) - }), + }) + .map_err(LookupRequestError::BlobRequestError), &mut self.network, ) } @@ -1002,9 +1005,11 @@ impl SyncManager { self.block_lookups .on_download_response::>( requester.0.lookup_id, - custody_columns.map(|(columns, peer_group)| { - (columns, peer_group, seen_timestamp) - }), + custody_columns + .map(|(columns, peer_group)| { + (columns, peer_group, seen_timestamp) + }) + .map_err(LookupRequestError::CustodyRequestError), &mut self.network, ); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index bdd6ca241fd..0e17b187b27 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -2,7 +2,7 @@ //! channel and stores a global RPC ID to perform requests. pub use self::custody::CustodyId; -use self::custody::{ActiveCustodyRequest, CustodyRequester, Error as CustodyRequestError}; +use self::custody::{ActiveCustodyRequest, CustodyRequestError, CustodyRequester}; use self::requests::{ ActiveBlobsByRootRequest, ActiveBlocksByRootRequest, ActiveDataColumnsByRootRequest, }; @@ -30,7 +30,7 @@ use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; use lighthouse_network::{ Client, Eth2Enr, NetworkGlobals, PeerAction, PeerId, ReportSource, Request, }; -pub use requests::LookupVerifyError; +pub use requests::RpcByRootVerifyError; use slog::{debug, error, trace, warn}; use slot_clock::SlotClock; use std::collections::hash_map::Entry; @@ -71,24 +71,23 @@ pub enum RpcEvent { RPCError(RPCError), } -pub type RpcProcessingResult = Result<(T, Duration), LookupFailure>; - #[derive(Debug)] -pub enum LookupFailure { - RpcError(RPCError), - LookupVerifyError(LookupVerifyError), - CustodyRequestError(CustodyRequestError), +pub enum RpcByRootRequestError { + NetworkError(RPCError), + VerifyError(RpcByRootVerifyError), } -impl From for LookupFailure { +pub type RpcByRootRequestResult = Result<(T, Duration), RpcByRootRequestError>; + +impl From for RpcByRootRequestError { fn from(e: RPCError) -> Self { - LookupFailure::RpcError(e) + Self::NetworkError(e) } } -impl From for LookupFailure { - fn from(e: LookupVerifyError) -> Self { - LookupFailure::LookupVerifyError(e) +impl From for RpcByRootRequestError { + fn from(e: RpcByRootVerifyError) -> Self { + Self::VerifyError(e) } } @@ -801,7 +800,7 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, block: RpcEvent>>, - ) -> Option>>> { + ) -> Option>>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { return None; }; @@ -827,8 +826,8 @@ impl SyncNetworkContext { } }; - if let Err(ref e) = resp { - self.on_lookup_failure(peer_id, e); + if let Err(RpcByRootRequestError::VerifyError(e)) = &resp { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } Some(resp) } @@ -838,7 +837,7 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, blob: RpcEvent>>, - ) -> Option>> { + ) -> Option>> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { return None; }; @@ -869,8 +868,8 @@ impl SyncNetworkContext { } }; - if let Err(ref e) = resp { - self.on_lookup_failure(peer_id, e); + if let Err(RpcByRootRequestError::VerifyError(e)) = &resp { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } Some(resp) } @@ -883,7 +882,7 @@ impl SyncNetworkContext { item: RpcEvent>>, ) -> Option<( DataColumnsByRootRequester, - RpcProcessingResult>>>, + RpcByRootRequestResult>>>, )> { let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else { return None; @@ -914,8 +913,8 @@ impl SyncNetworkContext { } }; - if let Err(ref e) = resp { - self.on_lookup_failure(peer_id, e); + if let Err(RpcByRootRequestError::VerifyError(e)) = &resp { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } Some((requester, resp)) } @@ -932,14 +931,14 @@ impl SyncNetworkContext { &mut self, id: CustodyId, peer_id: PeerId, - resp: RpcProcessingResult>>>, + resp: RpcByRootRequestResult>>>, ) -> Option<( CustodyRequester, - Result<(Vec>, PeerGroup), LookupFailure>, + Result<(Vec>, PeerGroup), CustodyRequestError>, )> { // Note: need to remove the request to borrow self again below. Otherwise we can't // do nested requests - let Some(mut request) = self.custody_by_root_requests.remove(&id.id) else { + let Some(mut request) = self.custody_by_root_requests.remove(&id.requester) else { // TOOD(das): This log can happen if the request is error'ed early and dropped debug!(self.log, "Custody column downloaded event for unknown request"; "id" => ?id); return None; @@ -947,7 +946,6 @@ impl SyncNetworkContext { let result = request .on_data_column_downloaded(peer_id, id.column_index, resp, self) - .map_err(LookupFailure::CustodyRequestError) .transpose(); // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to @@ -962,9 +960,9 @@ impl SyncNetworkContext { } } - Some((id.id, result)) + Some((id.requester, result)) } else { - self.custody_by_root_requests.insert(id.id, request); + self.custody_by_root_requests.insert(id.requester, request); None } } @@ -1064,31 +1062,17 @@ impl SyncNetworkContext { } } } - - /// Downscore peers for lookup errors that originate from sync - pub fn on_lookup_failure(&self, peer_id: PeerId, err: &LookupFailure) { - match err { - // RPCErros are downscored in the network handler - LookupFailure::RpcError(_) => {} - // Only downscore lookup verify errors. RPC errors are downscored in the network handler. - LookupFailure::LookupVerifyError(e) => { - self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } - // CustodyRequestError are downscored in the each data_columns_by_root request - LookupFailure::CustodyRequestError(_) => {} - } - } } fn to_fixed_blob_sidecar_list( blobs: Vec>>, -) -> Result, LookupVerifyError> { +) -> Result, RpcByRootVerifyError> { let mut fixed_list = FixedBlobSidecarList::default(); for blob in blobs.into_iter() { let index = blob.index as usize; *fixed_list .get_mut(index) - .ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob) + .ok_or(RpcByRootVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob) } Ok(fixed_list) } diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index f20a95415db..0d0288d9fe8 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -9,11 +9,11 @@ use slog::{debug, warn}; use std::{marker::PhantomData, sync::Arc}; use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Epoch, Hash256}; -use super::{PeerGroup, RpcProcessingResult, SyncNetworkContext}; +use super::{PeerGroup, RpcByRootRequestResult, SyncNetworkContext}; #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct CustodyId { - pub id: CustodyRequester, + pub requester: CustodyRequester, pub column_index: ColumnIndex, } @@ -36,14 +36,15 @@ pub struct ActiveCustodyRequest { } #[derive(Debug)] -pub enum Error { +pub enum CustodyRequestError { SendFailed(&'static str), TooManyFailures, BadState(String), NoPeers(ColumnIndex), } -type CustodyRequestResult = Result>, PeerGroup)>, Error>; +type CustodyRequestResult = + Result>, PeerGroup)>, CustodyRequestError>; impl ActiveCustodyRequest { pub(crate) fn new( @@ -79,7 +80,7 @@ impl ActiveCustodyRequest { &mut self, _peer_id: PeerId, column_index: ColumnIndex, - resp: RpcProcessingResult>, + resp: RpcByRootRequestResult>, cx: &mut SyncNetworkContext, ) -> CustodyRequestResult { // TODO(das): Should downscore peers for verify errors here @@ -164,7 +165,7 @@ impl ActiveCustodyRequest { } mod request { - use super::{CustodyId, CustodyRequester, Error}; + use super::{CustodyId, CustodyRequestError, CustodyRequester}; use crate::sync::{ manager::DataColumnsByRootRequester, network_context::{DataColumnsByRootSingleBlockRequest, SyncNetworkContext}, @@ -218,7 +219,7 @@ mod request { block_epoch: Epoch, requester: CustodyRequester, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { match &self.status { Status::NotStarted => {} // Ok to continue Status::Downloading(_) => return Ok(false), // Already downloading @@ -226,7 +227,7 @@ mod request { } if self.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS { - return Err(Error::TooManyFailures); + return Err(CustodyRequestError::TooManyFailures); } // TODO: When is a fork and only a subset of your peers know about a block, sampling should only @@ -238,12 +239,12 @@ mod request { // Do not tolerate not having custody peers, hard error. // TODO(das): we might implement some grace period. The request will pause for X // seconds expecting the peer manager to find peers before failing the request. - return Err(Error::NoPeers(self.column_index)); + return Err(CustodyRequestError::NoPeers(self.column_index)); }; cx.data_column_lookup_request( DataColumnsByRootRequester::Custody(CustodyId { - id: requester, + requester, column_index: self.column_index, }), peer_id, @@ -252,32 +253,32 @@ mod request { indices: vec![self.column_index], }, ) - .map_err(Error::SendFailed)?; + .map_err(CustodyRequestError::SendFailed)?; self.status = Status::Downloading(peer_id); Ok(true) } - pub(crate) fn on_download_error(&mut self) -> Result { + pub(crate) fn on_download_error(&mut self) -> Result { match self.status.clone() { Status::Downloading(peer_id) => { self.download_failures += 1; self.status = Status::NotStarted; Ok(peer_id) } - other => Err(Error::BadState(format!( + other => Err(CustodyRequestError::BadState(format!( "bad state on_sampling_error expected Sampling got {other:?}" ))), } } - pub(crate) fn on_download_success(&mut self) -> Result<(), Error> { + pub(crate) fn on_download_success(&mut self) -> Result<(), CustodyRequestError> { match &self.status { Status::Downloading(peer) => { self.status = Status::Downloaded(*peer); Ok(()) } - other => Err(Error::BadState(format!( + other => Err(CustodyRequestError::BadState(format!( "bad state on_sampling_success expected Sampling got {other:?}" ))), } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 2fc239b49b2..88e5e3a3443 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,7 +1,7 @@ use beacon_chain::get_block_root; use lighthouse_network::rpc::{ methods::{BlobsByRootRequest, DataColumnsByRootRequest}, - BlocksByRootRequest, RPCError, + BlocksByRootRequest, }; use std::sync::Arc; use strum::IntoStaticStr; @@ -11,7 +11,7 @@ use types::{ }; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum LookupVerifyError { +pub enum RpcByRootVerifyError { NoResponseReturned, TooManyResponses, UnrequestedBlockRoot(Hash256), @@ -39,14 +39,14 @@ impl ActiveBlocksByRootRequest { pub fn add_response( &mut self, block: Arc>, - ) -> Result>, LookupVerifyError> { + ) -> Result>, RpcByRootVerifyError> { if self.resolved { - return Err(LookupVerifyError::TooManyResponses); + return Err(RpcByRootVerifyError::TooManyResponses); } let block_root = get_block_root(&block); if self.request.0 != block_root { - return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + return Err(RpcByRootVerifyError::UnrequestedBlockRoot(block_root)); } // Valid data, blocks by root expects a single response @@ -54,11 +54,11 @@ impl ActiveBlocksByRootRequest { Ok(block) } - pub fn terminate(self) -> Result<(), LookupVerifyError> { + pub fn terminate(self) -> Result<(), RpcByRootVerifyError> { if self.resolved { Ok(()) } else { - Err(LookupVerifyError::NoResponseReturned) + Err(RpcByRootVerifyError::NoResponseReturned) } } } @@ -114,23 +114,23 @@ impl ActiveBlobsByRootRequest { pub fn add_response( &mut self, blob: Arc>, - ) -> Result>>>, LookupVerifyError> { + ) -> Result>>>, RpcByRootVerifyError> { if self.resolved { - return Err(LookupVerifyError::TooManyResponses); + return Err(RpcByRootVerifyError::TooManyResponses); } let block_root = blob.block_root(); if self.request.block_root != block_root { - return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + return Err(RpcByRootVerifyError::UnrequestedBlockRoot(block_root)); } if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - return Err(LookupVerifyError::InvalidInclusionProof); + return Err(RpcByRootVerifyError::InvalidInclusionProof); } if !self.request.indices.contains(&blob.index) { - return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index)); + return Err(RpcByRootVerifyError::UnrequestedBlobIndex(blob.index)); } if self.blobs.iter().any(|b| b.index == blob.index) { - return Err(LookupVerifyError::DuplicateData); + return Err(RpcByRootVerifyError::DuplicateData); } self.blobs.push(blob); @@ -196,28 +196,25 @@ impl ActiveDataColumnsByRootRequest { pub fn add_response( &mut self, data_column: Arc>, - ) -> Result>>>, RPCError> { + ) -> Result>>>, RpcByRootVerifyError> { if self.resolved { - return Err(RPCError::InvalidData("too many responses".to_string())); + return Err(RpcByRootVerifyError::TooManyResponses); } let block_root = data_column.block_root(); if self.request.block_root != block_root { - return Err(RPCError::InvalidData(format!( - "un-requested block root {block_root:?}" - ))); + return Err(RpcByRootVerifyError::UnrequestedBlockRoot(block_root)); } if !data_column.verify_inclusion_proof().unwrap_or(false) { - return Err(RPCError::InvalidData("invalid inclusion proof".to_string())); + return Err(RpcByRootVerifyError::InvalidInclusionProof); } if !self.request.indices.contains(&data_column.index) { - return Err(RPCError::InvalidData(format!( - "un-requested index {}", - data_column.index - ))); + return Err(RpcByRootVerifyError::UnrequestedBlobIndex( + data_column.index, + )); } if self.items.iter().any(|b| b.index == data_column.index) { - return Err(RPCError::InvalidData("duplicated data".to_string())); + return Err(RpcByRootVerifyError::DuplicateData); } self.items.push(data_column); diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs index 84ebe69928a..6311f99f474 100644 --- a/beacon_node/network/src/sync/sampling.rs +++ b/beacon_node/network/src/sync/sampling.rs @@ -1,5 +1,5 @@ -use self::request::ActiveColumnSampleRequest; -use super::network_context::{LookupFailure, SyncNetworkContext}; +use self::request::{ActiveColumnSampleRequest, SamplingErrorReason}; +use super::network_context::{RpcByRootRequestError, SyncNetworkContext}; use crate::metrics; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; @@ -101,7 +101,7 @@ impl Sampling { &mut self, id: SamplingId, peer_id: PeerId, - resp: Result<(DataColumnSidecarList, Duration), LookupFailure>, + resp: Result<(DataColumnSidecarList, Duration), RpcByRootRequestError>, cx: &mut SyncNetworkContext, ) -> Option<(SamplingRequester, SamplingResult)> { let Some(request) = self.requests.get_mut(&id.id) else { @@ -235,7 +235,7 @@ impl ActiveSamplingRequest { &mut self, _peer_id: PeerId, column_index: ColumnIndex, - resp: Result<(DataColumnSidecarList, Duration), LookupFailure>, + resp: Result<(DataColumnSidecarList, Duration), RpcByRootRequestError>, cx: &mut SyncNetworkContext, ) -> Result, SamplingError> { // Select columns to sample @@ -284,7 +284,7 @@ impl ActiveSamplingRequest { // Peer does not have the requested data. // TODO(das) what to do? debug!(self.log, "Sampling peer claims to not have the data"; "block_root" => %self.block_root, "column_index" => column_index); - request.on_sampling_error()?; + request.on_sampling_error(SamplingErrorReason::DontHave)?; } } Err(err) => { @@ -293,7 +293,7 @@ impl ActiveSamplingRequest { // Error downloading, maybe penalize peer and retry again. // TODO(das) with different peer or different peer? - request.on_sampling_error()?; + request.on_sampling_error(SamplingErrorReason::DownloadError)?; } }; @@ -341,7 +341,7 @@ impl ActiveSamplingRequest { // TODO(das): Peer sent invalid data, penalize and try again from different peer // TODO(das): Count individual failures - let peer_id = request.on_sampling_error()?; + let peer_id = request.on_sampling_error(SamplingErrorReason::Invalid)?; cx.report_peer( peer_id, PeerAction::LowToleranceError, @@ -427,12 +427,20 @@ mod request { use std::collections::HashSet; use types::{data_column_sidecar::ColumnIndex, EthSpec, Hash256, Slot}; + pub(crate) enum SamplingErrorReason { + DownloadError, + DontHave, + Invalid, + } + pub(crate) struct ActiveColumnSampleRequest { column_index: ColumnIndex, status: Status, // TODO(das): Should downscore peers that claim to not have the sample? - #[allow(dead_code)] + /// Set of peers who claim to no have this column peers_dont_have: HashSet, + /// Set of peers that have sent us a sample with an invalid proof + peers_sent_invalid: HashSet, } #[derive(Debug, Clone)] @@ -449,6 +457,7 @@ mod request { column_index, status: Status::NotStarted, peers_dont_have: <_>::default(), + peers_sent_invalid: <_>::default(), } } @@ -488,13 +497,30 @@ mod request { // TODO: When is a fork and only a subset of your peers know about a block, sampling should only // be queried on the peers on that fork. Should this case be handled? How to handle it? - let peer_ids = cx.get_custodial_peers( + let custodial_peer_ids = cx.get_custodial_peers( block_slot.epoch(::slots_per_epoch()), self.column_index, ); - // TODO(das) randomize custodial peer and avoid failing peers - if let Some(peer_id) = peer_ids.first().cloned() { + // TODO(das) randomize peer selection + let selected_peer_id: Option = { + // Filter out peers that have sent us invalid data, never request from then again + let peer_ids = custodial_peer_ids + .iter() + .filter(|peer| self.peers_sent_invalid.contains(peer)) + .collect::>(); + // Check if there are peers we haven't requested from yet + if let Some(peer) = peer_ids + .iter() + .find(|peer| !self.peers_dont_have.contains(peer)) + { + Some(**peer) + } else { + peer_ids.first().copied().copied() + } + }; + + if let Some(peer_id) = selected_peer_id { cx.data_column_lookup_request( DataColumnsByRootRequester::Sampling(SamplingId { id: requester, @@ -516,10 +542,24 @@ mod request { } } - pub(crate) fn on_sampling_error(&mut self) -> Result { + pub(crate) fn on_sampling_error( + &mut self, + reason: SamplingErrorReason, + ) -> Result { match self.status.clone() { Status::Sampling(peer_id) => { self.status = Status::NotStarted; + match reason { + SamplingErrorReason::DownloadError => { + // Allow download errors, don't track + } + SamplingErrorReason::DontHave => { + self.peers_dont_have.insert(peer_id); + } + SamplingErrorReason::Invalid => { + self.peers_sent_invalid.insert(peer_id); + } + } Ok(peer_id) } other => Err(SamplingError::BadState(format!( From 098f8ad87e559e4e832569c279fa9c1b6d7166bd Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 11 May 2024 12:19:43 +0400 Subject: [PATCH 2/5] Ensure there is only one in-flight request for block lookups --- .../network/src/sync/block_lookups/common.rs | 10 ++-- .../network/src/sync/block_lookups/mod.rs | 33 ++++++------ .../sync/block_lookups/single_block_lookup.rs | 45 ++++++++++++---- beacon_node/network/src/sync/manager.rs | 6 +-- .../network/src/sync/network_context.rs | 54 +++++++++---------- 5 files changed, 86 insertions(+), 62 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 0535dde7a04..ceb4f61b642 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -3,7 +3,7 @@ use crate::sync::block_lookups::single_block_lookup::{ }; use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId}; use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE}; -use crate::sync::network_context::SyncNetworkContext; +use crate::sync::network_context::{ReqId, SyncNetworkContext}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::BeaconChainTypes; @@ -49,7 +49,7 @@ pub trait RequestState { peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result; + ) -> Result, LookupError>; /* Response handling methods */ @@ -84,7 +84,7 @@ impl RequestState for BlockRequestState { peer_id: PeerId, _: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result, LookupError> { cx.block_lookup_request(id, peer_id, self.requested_block_root) .map_err(LookupError::SendFailed) } @@ -132,7 +132,7 @@ impl RequestState for BlobRequestState { peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result, LookupError> { cx.blob_lookup_request( id, peer_id, @@ -186,7 +186,7 @@ impl RequestState for CustodyRequestState { _peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result, LookupError> { cx.custody_lookup_request(id, self.block_root, downloaded_block_expected_blobs) .map_err(LookupError::SendFailed) } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 7948913f168..b1ac3aef24d 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -6,7 +6,7 @@ use super::network_context::{PeerGroup, RpcByRootRequestError, SyncNetworkContex use crate::metrics; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; -use crate::sync::manager::Id; +use crate::sync::manager::{Id, SingleLookupReqId}; use crate::sync::network_context::custody::CustodyRequestError; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; @@ -312,28 +312,28 @@ impl BlockLookups { /// Process a block or blob response received from a single lookup request. pub fn on_download_response>( &mut self, - id: SingleLookupId, + id: SingleLookupReqId, response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupRequestError>, cx: &mut SyncNetworkContext, ) { let result = self.on_download_response_inner::(id, response, cx); - self.on_lookup_result(id, result, "download_response", cx); + self.on_lookup_result(id.lookup_id, result, "download_response", cx); } /// Process a block or blob response received from a single lookup request. pub fn on_download_response_inner>( &mut self, - id: SingleLookupId, + id: SingleLookupReqId, response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupRequestError>, cx: &mut SyncNetworkContext, ) -> Result { // Note: do not downscore peers here for requests errors, SyncNetworkContext does it. let response_type = R::response_type(); - let Some(lookup) = self.single_block_lookups.get_mut(&id) else { + let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { // We don't have the ability to cancel in-flight RPC requests. So this can happen // if we started this RPC request, and later saw the block/blobs via gossip. - debug!(self.log, "Block returned for single block lookup not present"; "id" => id); + debug!(self.log, "Block returned for single block lookup not present"; "id" => id.lookup_id); return Err(LookupError::UnknownLookup); }; @@ -345,7 +345,7 @@ impl BlockLookups { debug!(self.log, "Received lookup download success"; "block_root" => ?block_root, - "id" => id, + "id" => ?id, "peer_group" => ?peer_group, "response_type" => ?response_type, ); @@ -353,12 +353,15 @@ impl BlockLookups { // Register the download peer here. Once we have received some data over the wire we // attribute it to this peer for scoring latter regardless of how the request was // done. - request_state.on_download_success(DownloadResult { - value: response, - block_root, - seen_timestamp, - peer_group, - })?; + request_state.on_download_success( + id.req_id, + DownloadResult { + value: response, + block_root, + seen_timestamp, + peer_group, + }, + )?; // continue_request will send for processing as the request state is AwaitingProcessing } Err(e) => { @@ -367,12 +370,12 @@ impl BlockLookups { debug!(self.log, "Received lookup download failure"; "block_root" => ?block_root, - "id" => id, + "id" => ?id, "response_type" => ?response_type, "error" => ?e, ); - request_state.on_download_failure()?; + request_state.on_download_failure(id.req_id)?; // continue_request will retry a download as the request state is AwaitingDownload } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index a7c379c3b57..667e31690e2 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -2,7 +2,7 @@ use super::common::ResponseType; use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::Id; -use crate::sync::network_context::{PeerGroup, SyncNetworkContext}; +use crate::sync::network_context::{PeerGroup, ReqId, SyncNetworkContext}; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::BeaconChainTypes; use rand::seq::IteratorRandom; @@ -41,6 +41,13 @@ pub enum LookupError { Failed, /// Attempted to retrieve a not known lookup id UnknownLookup, + /// Received a download result for a different request id than the in-flight request. + /// There should only exist a single request at a time. Having multiple requests is a bug and + /// can result in undefined state, so it's treated as a hard error and the lookup is dropped. + UnexpectedRequestId { + expected_req_id: ReqId, + req_id: ReqId, + }, } pub struct SingleBlockLookup { @@ -172,8 +179,10 @@ impl SingleBlockLookup { } // make_request returns true only if a request needs to be made - if request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { - request.get_state_mut().on_download_start()?; + if let Some(req_id) = + request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? + { + request.get_state_mut().on_download_start(req_id)?; } else { request.get_state_mut().on_completed_request()?; } @@ -280,7 +289,7 @@ pub struct DownloadResult { #[derive(Debug)] pub enum State { AwaitingDownload, - Downloading, + Downloading(ReqId), AwaitingProcess(DownloadResult), Processing(DownloadResult), Processed(Option), @@ -348,10 +357,10 @@ impl SingleLookupRequestState { } /// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None. - pub fn on_download_start(&mut self) -> Result<(), LookupError> { + pub fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupError> { match &self.state { State::AwaitingDownload => { - self.state = State::Downloading; + self.state = State::Downloading(req_id); Ok(()) } other => Err(LookupError::BadState(format!( @@ -362,9 +371,15 @@ impl SingleLookupRequestState { /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong /// block. - pub fn on_download_failure(&mut self) -> Result<(), LookupError> { + pub fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupError> { match &self.state { - State::Downloading => { + State::Downloading(expected_req_id) => { + if req_id != *expected_req_id { + return Err(LookupError::UnexpectedRequestId { + expected_req_id: *expected_req_id, + req_id, + }); + } self.failed_downloading = self.failed_downloading.saturating_add(1); self.state = State::AwaitingDownload; Ok(()) @@ -375,9 +390,19 @@ impl SingleLookupRequestState { } } - pub fn on_download_success(&mut self, result: DownloadResult) -> Result<(), LookupError> { + pub fn on_download_success( + &mut self, + req_id: ReqId, + result: DownloadResult, + ) -> Result<(), LookupError> { match &self.state { - State::Downloading => { + State::Downloading(expected_req_id) => { + if req_id != *expected_req_id { + return Err(LookupError::UnexpectedRequestId { + expected_req_id: *expected_req_id, + req_id, + }); + } self.state = State::AwaitingProcess(result); Ok(()) } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 33a244d6443..bf8f947ee05 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -892,7 +892,7 @@ impl SyncManager { if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) { self.block_lookups .on_download_response::>( - id.lookup_id, + id, resp.map(|(value, seen_timestamp)| { (value, PeerGroup::from_single(peer_id), seen_timestamp) }) @@ -967,7 +967,7 @@ impl SyncManager { if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) { self.block_lookups .on_download_response::>( - id.lookup_id, + id, resp.map(|(value, seen_timestamp)| { (value, PeerGroup::from_single(peer_id), seen_timestamp) }) @@ -1004,7 +1004,7 @@ impl SyncManager { let seen_timestamp = timestamp_now(); self.block_lookups .on_download_response::>( - requester.0.lookup_id, + requester.0, custody_columns .map(|(columns, peer_group)| { (columns, peer_group, seen_timestamp) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 0e17b187b27..95a92aa7d30 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -91,6 +91,8 @@ impl From for RpcByRootRequestError { } } +pub type ReqId = u32; + #[derive(Clone, Debug)] pub struct PeerGroup { peers: Vec, @@ -410,20 +412,18 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, peer_id: PeerId, block_root: Hash256, - ) -> Result { + ) -> Result, &'static str> { if self .chain .reqresp_pre_import_cache .read() .contains_key(&block_root) { - return Ok(false); + return Ok(None); } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), - }; + let req_id = self.next_id(); + let id = SingleLookupReqId { lookup_id, req_id }; debug!( self.log, @@ -445,7 +445,7 @@ impl SyncNetworkContext { self.blocks_by_root_requests .insert(id, ActiveBlocksByRootRequest::new(request)); - Ok(true) + Ok(Some(req_id)) } /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: @@ -460,7 +460,7 @@ impl SyncNetworkContext { peer_id: PeerId, block_root: Hash256, downloaded_block_expected_blobs: Option, - ) -> Result { + ) -> Result, &'static str> { // Check if we are into deneb, and before peerdas if !self .chain @@ -474,7 +474,7 @@ impl SyncNetworkContext { .epoch(T::EthSpec::slots_per_epoch()), ) { - return Ok(false); + return Ok(None); } let expected_blobs = downloaded_block_expected_blobs @@ -500,13 +500,11 @@ impl SyncNetworkContext { if indices.is_empty() { // No blobs required, do not issue any request - return Ok(false); + return Ok(None); } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), - }; + let req_id = self.next_id(); + let id = SingleLookupReqId { lookup_id, req_id }; debug!( self.log, @@ -532,7 +530,7 @@ impl SyncNetworkContext { self.blobs_by_root_requests .insert(id, ActiveBlobsByRootRequest::new(request)); - Ok(true) + Ok(Some(req_id)) } pub fn data_column_lookup_request( @@ -540,8 +538,10 @@ impl SyncNetworkContext { requester: DataColumnsByRootRequester, peer_id: PeerId, request: DataColumnsByRootSingleBlockRequest, - ) -> Result<(), &'static str> { + ) -> Result { let req_id = self.next_id(); + let id = DataColumnsByRootRequestId { requester, req_id }; + debug!( self.log, "Sending DataColumnsByRoot Request"; @@ -549,10 +549,8 @@ impl SyncNetworkContext { "block_root" => ?request.block_root, "indices" => ?request.indices, "peer" => %peer_id, - "requester" => ?requester, - "id" => req_id, + "id" => ?id, ); - let id = DataColumnsByRootRequestId { requester, req_id }; self.send_network_msg(NetworkMessage::SendRequest { peer_id, @@ -563,7 +561,7 @@ impl SyncNetworkContext { self.data_columns_by_root_requests .insert(id, ActiveDataColumnsByRootRequest::new(request, requester)); - Ok(()) + Ok(req_id) } pub fn custody_lookup_request( @@ -571,7 +569,7 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, block_root: Hash256, downloaded_block_expected_data: Option, - ) -> Result { + ) -> Result, &'static str> { // Check if we are into peerdas if !self .chain @@ -585,7 +583,7 @@ impl SyncNetworkContext { .epoch(T::EthSpec::slots_per_epoch()), ) { - return Ok(false); + return Ok(None); } let expects_data = downloaded_block_expected_data @@ -600,7 +598,7 @@ impl SyncNetworkContext { // No data required for this block if !expects_data { - return Ok(false); + return Ok(None); } let custody_indexes_imported = self @@ -621,13 +619,11 @@ impl SyncNetworkContext { if custody_indexes_to_fetch.is_empty() { // No indexes required, do not issue any request - return Ok(false); + return Ok(None); } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), - }; + let req_id = self.next_id(); + let id = SingleLookupReqId { lookup_id, req_id }; debug!( self.log, @@ -653,7 +649,7 @@ impl SyncNetworkContext { // created cannot return data immediately, it must send some request to the network // first. And there must exist some request, `custody_indexes_to_fetch` is not empty. self.custody_by_root_requests.insert(requester, request); - Ok(true) + Ok(Some(req_id)) } // TODO(das): handle this error properly Err(_) => Err("custody_send_error"), From 1514c1736dbd0c2c3662c796dd30cbc228dd6a1f Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 11 May 2024 13:35:39 +0400 Subject: [PATCH 3/5] Lookups request exactly needed data --- .../network/src/sync/block_lookups/common.rs | 8 ++- .../network/src/sync/block_lookups/mod.rs | 28 +++----- .../sync/block_lookups/single_block_lookup.rs | 34 +++++----- .../network/src/sync/network_context.rs | 64 ++++++++----------- .../src/sync/network_context/requests.rs | 19 +++++- 5 files changed, 76 insertions(+), 77 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index ceb4f61b642..9a2c3f96929 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -187,8 +187,12 @@ impl RequestState for CustodyRequestState { downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, ) -> Result, LookupError> { - cx.custody_lookup_request(id, self.block_root, downloaded_block_expected_blobs) - .map_err(LookupError::SendFailed) + cx.custody_lookup_request( + id, + self.block_root, + downloaded_block_expected_blobs.map(|n| n > 0), + ) + .map_err(LookupError::SendFailed) } fn send_for_processing( diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index b1ac3aef24d..e3e63ed2b0a 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -456,27 +456,15 @@ impl BlockLookups { // if both components have been processed. request_state.on_processing_success()?; - // If this was the result of a block request, we can't determined if the block peer did anything - // wrong. If we already had both a block and blobs response processed, we should penalize the - // blobs peer because they did not provide all blobs on the initial request. - if lookup.both_components_processed() { - // TODO(das): extend to columns peers too - if let Some(blob_peer) = lookup - .blob_request_state - .state - .on_post_process_validation_failure()? - { - // TODO(das): downscore only the peer that served the request for all blobs - for peer in blob_peer.all() { - cx.report_peer( - *peer, - PeerAction::MidToleranceError, - "sent_incomplete_blobs", - ); - } - } + // We don't request for other block components until being sure that the block has + // data. If we request blobs / columns to a peer we are sure those must exist. + // Therefore if all components are processed and we still receive `MissingComponents` + // it indicates an internal bug. + if lookup.all_components_processed() { + return Err(LookupError::MissingComponentsAfterAllProcessed); + } else { + Action::Retry } - Action::Retry } BlockProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 667e31690e2..1f83e95a87d 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -39,6 +39,9 @@ pub enum LookupError { BadState(String), /// Lookup failed for some other reason and should be dropped Failed, + /// Received MissingComponents when all components have been processed. This should never + /// happen, and indicates some internal bug + MissingComponentsAfterAllProcessed, /// Attempted to retrieve a not known lookup id UnknownLookup, /// Received a download result for a different request id than the in-flight request. @@ -167,7 +170,20 @@ impl SingleBlockLookup { .block_request_state .state .peek_downloaded_data() - .map(|block| block.num_expected_blobs()); + .map(|block| block.num_expected_blobs()) + .or_else(|| { + // This is a bit of a hack, becase the block request `Processed` state does not + // store block details. `peek_downloaded_data` only returns data if the block is + // actively downloading. + if self.block_request_state.state.is_processed() { + cx.chain + .data_availability_checker + .num_expected_blobs(&self.block_root) + } else { + None + } + }); + let peer_id = self.get_rand_available_peer().ok_or(LookupError::NoPeers)?; let request = R::request_state_mut(self); @@ -210,7 +226,7 @@ impl SingleBlockLookup { } /// Returns true if the block has already been downloaded. - pub fn both_components_processed(&self) -> bool { + pub fn all_components_processed(&self) -> bool { self.block_request_state.state.is_processed() && self.blob_request_state.state.is_processed() && self.custody_request_state.state.is_processed() @@ -467,20 +483,6 @@ impl SingleLookupRequestState { } } - pub fn on_post_process_validation_failure(&mut self) -> Result, LookupError> { - match &self.state { - State::Processed(peer_group) => { - let peer_group = peer_group.clone(); - self.failed_processing = self.failed_processing.saturating_add(1); - self.state = State::AwaitingDownload; - Ok(peer_group) - } - other => Err(LookupError::BadState(format!( - "Bad state on_post_process_validation_failure expected Processed got {other}" - ))), - } - } - /// Mark a request as complete without any download or processing pub fn on_completed_request(&mut self) -> Result<(), LookupError> { match &self.state { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 95a92aa7d30..1437d3f56d9 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -477,16 +477,18 @@ impl SyncNetworkContext { return Ok(None); } - let expected_blobs = downloaded_block_expected_blobs - .or_else(|| { - self.chain - .data_availability_checker - .num_expected_blobs(&block_root) - }) - .unwrap_or( - // If we don't about the block being requested, attempt to fetch all blobs - T::EthSpec::max_blobs_per_block(), - ); + // Do not download blobs until the block is downloaded (or already in the da_checker). + // Then we avoid making requests to peers for blocks that may not have data. If the + // block is not yet downloaded, do nothing. There is at least one future event to + // continue this request. + let Some(expected_blobs) = downloaded_block_expected_blobs else { + return Ok(None); + }; + + // No data required + if expected_blobs == 0 { + return Ok(None); + } let imported_blob_indexes = self .chain @@ -568,7 +570,7 @@ impl SyncNetworkContext { &mut self, lookup_id: SingleLookupId, block_root: Hash256, - downloaded_block_expected_data: Option, + downloaded_block_expected_data: Option, ) -> Result, &'static str> { // Check if we are into peerdas if !self @@ -586,15 +588,13 @@ impl SyncNetworkContext { return Ok(None); } - let expects_data = downloaded_block_expected_data - .or_else(|| { - self.chain - .data_availability_checker - .num_expected_blobs(&block_root) - }) - .map(|n| n > 0) - // If we don't know about the block being requested, assume block has data - .unwrap_or(true); + // Do not download columns until the block is downloaded (or already in the da_checker). + // Then we avoid making requests to peers for blocks that may not have data. If the + // block is not yet downloaded, do nothing. There is at least one future event to + // continue this request. + let Some(expects_data) = downloaded_block_expected_data else { + return Ok(None); + }; // No data required for this block if !expects_data { @@ -849,15 +849,10 @@ impl SyncNetworkContext { Err(e.into()) } }, - RpcEvent::StreamTermination => { - // Stream terminator - match request.remove().terminate() { - Some(blobs) => to_fixed_blob_sidecar_list(blobs) - .map(|blobs| (blobs, timestamp_now())) - .map_err(Into::into), - None => return None, - } - } + RpcEvent::StreamTermination => match request.remove().terminate() { + Ok(_) => return None, + Err(e) => Err(e.into()), + }, RpcEvent::RPCError(e) => { request.remove(); Err(e.into()) @@ -896,13 +891,10 @@ impl SyncNetworkContext { Err(e.into()) } }, - RpcEvent::StreamTermination => { - // Stream terminator - match request.remove().terminate() { - Some(items) => Ok((items, timestamp_now())), - None => return None, - } - } + RpcEvent::StreamTermination => match request.remove().terminate() { + Some(items) => Ok((items, timestamp_now())), + None => return None, + }, RpcEvent::RPCError(e) => { request.remove(); Err(e.into()) diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 88e5e3a3443..722a8cea09c 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -12,11 +12,21 @@ use types::{ #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum RpcByRootVerifyError { + /// On a request that expects strictly one item, we receive the stream termination before + /// that item NoResponseReturned, + /// On a request for a strict number of items, we receive the stream termination before the + /// expected count of items + NotEnoughResponsesReturned, + /// Received more items than expected TooManyResponses, + /// Received an item that corresponds to a different request block root UnrequestedBlockRoot(Hash256), + /// Received a blob / column with an index that is not in the requested set UnrequestedBlobIndex(u64), + /// Blob or column inclusion proof does not match its own header InvalidInclusionProof, + /// Received more than one item for the tuple (block_root, index) DuplicateData, } @@ -143,11 +153,13 @@ impl ActiveBlobsByRootRequest { } } - pub fn terminate(self) -> Option>>> { + /// Handle a stream termination. Expects sender to strictly send the requested number of items + pub fn terminate(self) -> Result<(), RpcByRootVerifyError> { if self.resolved { - None + Ok(()) } else { - Some(self.blobs) + // Expect to receive the stream termination AFTER the expect number of items + Err(RpcByRootVerifyError::NotEnoughResponsesReturned) } } } @@ -227,6 +239,7 @@ impl ActiveDataColumnsByRootRequest { } } + /// Handle stream termination. Allows the sender to return less items than requested. pub fn terminate(self) -> Option>>> { if self.resolved { None From ddda4062764f50ce2f331d4419ad0b252fc1cc68 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sun, 12 May 2024 11:56:14 +0300 Subject: [PATCH 4/5] WIP multiple custody requests in lookup struct --- .../network/src/sync/block_lookups/common.rs | 85 ++++-- .../network/src/sync/block_lookups/mod.rs | 92 ++++-- .../sync/block_lookups/single_block_lookup.rs | 86 ++++-- .../network/src/sync/block_lookups/tests.rs | 67 ++-- beacon_node/network/src/sync/manager.rs | 67 ++-- .../network/src/sync/network_context.rs | 152 +--------- .../src/sync/network_context/custody.rs | 287 ------------------ 7 files changed, 274 insertions(+), 562 deletions(-) delete mode 100644 beacon_node/network/src/sync/network_context/custody.rs diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 9a2c3f96929..6d98b8656a9 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -1,21 +1,24 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupError, SingleBlockLookup, SingleLookupRequestState, }; -use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId}; -use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE}; -use crate::sync::network_context::{ReqId, SyncNetworkContext}; +use crate::sync::block_lookups::{ + BlobRequestState, BlockRequestState, CustodyColumnRequestState, PeerId, +}; +use crate::sync::manager::{ + BlockProcessType, DataColumnsByRootRequester, Id, SLOT_IMPORT_TOLERANCE, +}; +use crate::sync::network_context::{ + DataColumnsByRootSingleBlockRequest, ReqId, SyncNetworkContext, +}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::BeaconChainTypes; use std::sync::Arc; -use types::blob_sidecar::FixedBlobSidecarList; -use types::SignedBeaconBlock; +use types::{blob_sidecar::FixedBlobSidecarList, ColumnIndex, Epoch, SignedBeaconBlock}; use super::single_block_lookup::DownloadResult; use super::SingleLookupId; -use super::single_block_lookup::CustodyRequestState; - #[derive(Debug, Copy, Clone)] pub enum ResponseType { Block, @@ -56,6 +59,7 @@ pub trait RequestState { /// Send the response to the beacon processor. fn send_for_processing( id: Id, + component_index: usize, result: DownloadResult, cx: &SyncNetworkContext, ) -> Result<(), LookupError>; @@ -66,7 +70,10 @@ pub trait RequestState { fn response_type() -> ResponseType; /// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait. - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self; + fn request_state_mut( + request: &mut SingleBlockLookup, + component_index: usize, + ) -> Option<&mut Self>; /// A getter for a reference to the `SingleLookupRequestState` associated with this trait. fn get_state(&self) -> &SingleLookupRequestState; @@ -82,7 +89,7 @@ impl RequestState for BlockRequestState { &self, id: SingleLookupId, peer_id: PeerId, - _: Option, + _downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, ) -> Result, LookupError> { cx.block_lookup_request(id, peer_id, self.requested_block_root) @@ -91,6 +98,7 @@ impl RequestState for BlockRequestState { fn send_for_processing( id: SingleLookupId, + _component_index: usize, download_result: DownloadResult, cx: &SyncNetworkContext, ) -> Result<(), LookupError> { @@ -112,8 +120,8 @@ impl RequestState for BlockRequestState { fn response_type() -> ResponseType { ResponseType::Block } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.block_request_state + fn request_state_mut(request: &mut SingleBlockLookup, _: usize) -> Option<&mut Self> { + Some(&mut request.block_request_state) } fn get_state(&self) -> &SingleLookupRequestState { &self.state @@ -144,6 +152,7 @@ impl RequestState for BlobRequestState { fn send_for_processing( id: Id, + _component_index: usize, download_result: DownloadResult, cx: &SyncNetworkContext, ) -> Result<(), LookupError> { @@ -165,8 +174,8 @@ impl RequestState for BlobRequestState { fn response_type() -> ResponseType { ResponseType::Blob } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.blob_request_state + fn request_state_mut(request: &mut SingleBlockLookup, _: usize) -> Option<&mut Self> { + Some(&mut request.blob_request_state) } fn get_state(&self) -> &SingleLookupRequestState { &self.state @@ -176,27 +185,49 @@ impl RequestState for BlobRequestState { } } -impl RequestState for CustodyRequestState { - type VerifiedResponseType = Vec>; +impl RequestState for CustodyColumnRequestState { + type VerifiedResponseType = CustodyDataColumn; fn make_request( &self, id: Id, // TODO(das): consider selecting peers that have custody but are in this set _peer_id: PeerId, - downloaded_block_expected_blobs: Option, + _downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, ) -> Result, LookupError> { - cx.custody_lookup_request( - id, - self.block_root, - downloaded_block_expected_blobs.map(|n| n > 0), + // TODO: Ensure that requests are not sent when the block is: + // - not yet in PeerDAS epoch + // - outside da_boundary (maybe not necessary to check) + + // TODO(das): no rotation + let block_epoch = Epoch::new(0); + // TODO: When is a fork and only a subset of your peers know about a block, sampling should only + // be queried on the peers on that fork. Should this case be handled? How to handle it? + let peer_ids = cx.get_custodial_peers(block_epoch, self.column_index); + + // TODO(das) randomize custodial peer and avoid failing peers + let Some(peer_id) = peer_ids.first().cloned() else { + // Allow custody lookups to have zero peers. If no peers are found to have custody after + // some time the lookup is dropped. + return Ok(None); + }; + + cx.data_column_lookup_request( + DataColumnsByRootRequester::Custody(id, self.column_index), + peer_id, + DataColumnsByRootSingleBlockRequest { + block_root: self.block_root, + indices: vec![self.column_index], + }, ) + .map(Some) .map_err(LookupError::SendFailed) } fn send_for_processing( id: Id, + component_index: usize, download_result: DownloadResult, cx: &SyncNetworkContext, ) -> Result<(), LookupError> { @@ -208,9 +239,10 @@ impl RequestState for CustodyRequestState { } = download_result; cx.send_custody_columns_for_processing( block_root, - value, + // TODO(das): might be inneficient to send columns one by one + vec![value], seen_timestamp, - BlockProcessType::SingleCustodyColumn(id), + BlockProcessType::SingleCustodyColumn(id, component_index as ColumnIndex), ) .map_err(LookupError::SendFailed) } @@ -218,8 +250,13 @@ impl RequestState for CustodyRequestState { fn response_type() -> ResponseType { ResponseType::CustodyColumn } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.custody_request_state + fn request_state_mut( + request: &mut SingleBlockLookup, + component_index: usize, + ) -> Option<&mut Self> { + request + .custody_columns_requests + .get_mut(&(component_index as u64)) } fn get_state(&self) -> &SingleLookupRequestState { &self.state diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index e3e63ed2b0a..505d1afd734 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -7,7 +7,6 @@ use crate::metrics; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use crate::sync::manager::{Id, SingleLookupReqId}; -use crate::sync::network_context::custody::CustodyRequestError; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; @@ -15,13 +14,13 @@ pub use common::RequestState; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; +pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyColumnRequestState}; use slog::{debug, error, warn, Logger}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; use store::Hash256; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, Epoch, EthSpec, SignedBeaconBlock}; pub mod common; pub mod parent_chain; @@ -36,7 +35,7 @@ pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4; pub enum LookupRequestError { BlockRequestError(RpcByRootRequestError), BlobRequestError(RpcByRootRequestError), - CustodyRequestError(CustodyRequestError), + CustodyRequestError(RpcByRootRequestError), } pub enum BlockComponent { @@ -268,9 +267,23 @@ impl BlockLookups { } } + // TODO(das): If there's rotation we need to fetch the block before knowing which columns to + // custody. + let block_epoch = Epoch::new(0); + let custody_column_indexes = cx + .network_globals() + .custody_columns(block_epoch) + .expect("TODO: parse custody value on start-up"); + // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), // signal here to hold processing downloaded data. - let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); + let mut lookup = SingleBlockLookup::new( + block_root, + peers, + cx.next_id(), + awaiting_parent, + custody_column_indexes, + ); let msg = if block_component.is_some() { "Searching for components of a block with unknown parent" @@ -313,10 +326,11 @@ impl BlockLookups { pub fn on_download_response>( &mut self, id: SingleLookupReqId, + component_index: usize, response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupRequestError>, cx: &mut SyncNetworkContext, ) { - let result = self.on_download_response_inner::(id, response, cx); + let result = self.on_download_response_inner::(id, component_index, response, cx); self.on_lookup_result(id.lookup_id, result, "download_response", cx); } @@ -324,21 +338,21 @@ impl BlockLookups { pub fn on_download_response_inner>( &mut self, id: SingleLookupReqId, + component_index: usize, response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupRequestError>, cx: &mut SyncNetworkContext, ) -> Result { // Note: do not downscore peers here for requests errors, SyncNetworkContext does it. - let response_type = R::response_type(); - let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { - // We don't have the ability to cancel in-flight RPC requests. So this can happen - // if we started this RPC request, and later saw the block/blobs via gossip. - debug!(self.log, "Block returned for single block lookup not present"; "id" => id.lookup_id); - return Err(LookupError::UnknownLookup); - }; + let lookup = self + .single_block_lookups + .get_mut(&id.lookup_id) + .ok_or(LookupError::UnknownLookup)?; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup).get_state_mut(); + let request = R::request_state_mut(lookup, component_index) + .ok_or(LookupError::UnknownComponentIndex(component_index))?; + let request_state = request.get_state_mut(); match response { Ok((response, peer_group, seen_timestamp)) => { @@ -347,7 +361,7 @@ impl BlockLookups { "block_root" => ?block_root, "id" => ?id, "peer_group" => ?peer_group, - "response_type" => ?response_type, + "component" => format!("{:?}({})", R::response_type(), component_index), ); // Register the download peer here. Once we have received some data over the wire we @@ -371,7 +385,7 @@ impl BlockLookups { "Received lookup download failure"; "block_root" => ?block_root, "id" => ?id, - "response_type" => ?response_type, + "component" => format!("{:?}({})", R::response_type(), component_index), "error" => ?e, ); @@ -404,38 +418,45 @@ impl BlockLookups { result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) { - let lookup_result = match process_type { - BlockProcessType::SingleBlock { id } => { - self.on_processing_result_inner::>(id, result, cx) - } - BlockProcessType::SingleBlob { id } => { - self.on_processing_result_inner::>(id, result, cx) - } - BlockProcessType::SingleCustodyColumn(id) => { - self.on_processing_result_inner::>(id, result, cx) - } - }; + let lookup_result = + match process_type { + BlockProcessType::SingleBlock { id } => self + .on_processing_result_inner::>(id, 0, result, cx), + BlockProcessType::SingleBlob { id } => self + .on_processing_result_inner::>(id, 0, result, cx), + BlockProcessType::SingleCustodyColumn(id, component_index) => { + self.on_processing_result_inner::>( + id, + component_index as usize, + result, + cx, + ) + } + }; self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); } pub fn on_processing_result_inner>( &mut self, lookup_id: SingleLookupId, + component_index: usize, result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) -> Result { - let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else { - debug!(self.log, "Unknown single block lookup"; "id" => lookup_id); - return Err(LookupError::UnknownLookup); - }; + let lookup = self + .single_block_lookups + .get_mut(&lookup_id) + .ok_or(LookupError::UnknownLookup)?; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup).get_state_mut(); + let request = R::request_state_mut(lookup, component_index) + .ok_or(LookupError::UnknownComponentIndex(component_index))?; + let request_state = request.get_state_mut(); debug!( self.log, "Received lookup processing result"; - "component" => ?R::response_type(), + "component" => format!("{:?}({})", R::response_type(), component_index), "block_root" => ?block_root, "id" => lookup_id, "result" => ?result, @@ -463,6 +484,11 @@ impl BlockLookups { if lookup.all_components_processed() { return Err(LookupError::MissingComponentsAfterAllProcessed); } else { + // Trigger reconstruction if node has more than 50% of columns. Reconstruction + // is expenseive so it should be handled as a separate work event. The + // result of the reconstruction must be imported and published to gossip. + // The result of a reconstruction will be an import block components + Action::Retry } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 1f83e95a87d..ff1613f9e0b 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -6,14 +6,14 @@ use crate::sync::network_context::{PeerGroup, ReqId, SyncNetworkContext}; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::BeaconChainTypes; use rand::seq::IteratorRandom; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; -use types::{EthSpec, SignedBeaconBlock}; +use types::{ColumnIndex, EthSpec, SignedBeaconBlock}; // Dedicated enum for LookupResult to force its usage #[must_use = "LookupResult must be handled with on_lookup_result"] @@ -44,6 +44,8 @@ pub enum LookupError { MissingComponentsAfterAllProcessed, /// Attempted to retrieve a not known lookup id UnknownLookup, + /// Attempted to retrieve a not known lookup request by index + UnknownComponentIndex(usize), /// Received a download result for a different request id than the in-flight request. /// There should only exist a single request at a time. Having multiple requests is a bug and /// can result in undefined state, so it's treated as a hard error and the lookup is dropped. @@ -57,7 +59,7 @@ pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, pub blob_request_state: BlobRequestState, - pub custody_request_state: CustodyRequestState, + pub custody_columns_requests: HashMap>, block_root: Hash256, awaiting_parent: Option, /// Peers that claim to have imported this block @@ -66,17 +68,26 @@ pub struct SingleBlockLookup { impl SingleBlockLookup { pub fn new( - requested_block_root: Hash256, + block_root: Hash256, peers: &[PeerId], id: Id, awaiting_parent: Option, + custody_column_indexes: Vec, ) -> Self { Self { id, - block_request_state: BlockRequestState::new(requested_block_root), - blob_request_state: BlobRequestState::new(requested_block_root), - custody_request_state: CustodyRequestState::new(requested_block_root), - block_root: requested_block_root, + block_request_state: BlockRequestState::new(block_root), + blob_request_state: BlobRequestState::new(block_root), + custody_columns_requests: custody_column_indexes + .into_iter() + .map(|column_index| { + ( + column_index, + CustodyColumnRequestState::new(block_root, column_index), + ) + }) + .collect(), + block_root, awaiting_parent, peers: peers.iter().copied().collect(), } @@ -137,17 +148,26 @@ impl SingleBlockLookup { cx: &mut SyncNetworkContext, ) -> Result { // TODO: Check what's necessary to download, specially for blobs - self.continue_request::>(cx)?; - self.continue_request::>(cx)?; - self.continue_request::>(cx)?; + self.continue_request::>(cx, 0)?; + self.continue_request::>(cx, 0)?; + + // TODO(das): Quick hack, but data inefficient to allocate this array every time + let column_indexes = self + .custody_columns_requests + .keys() + .copied() + .collect::>(); + for column_index in column_indexes { + self.continue_request::>( + cx, + column_index as usize, + )?; + } // If all components of this lookup are already processed, there will be no future events // that can make progress so it must be dropped. Consider the lookup completed. // This case can happen if we receive the components from gossip during a retry. - if self.block_request_state.state.is_processed() - && self.blob_request_state.state.is_processed() - && self.custody_request_state.state.is_processed() - { + if self.all_components_processed() { Ok(LookupResult::Completed) } else { Ok(LookupResult::Pending) @@ -158,14 +178,23 @@ impl SingleBlockLookup { fn continue_request>( &mut self, cx: &mut SyncNetworkContext, + component_index: usize, ) -> Result<(), LookupError> { let id = self.id; let awaiting_parent = self.awaiting_parent.is_some(); let block_is_processed = self.block_request_state.state.is_processed(); - let request = R::request_state_mut(self); + let request = R::request_state_mut(self, component_index) + .ok_or(LookupError::UnknownComponentIndex(component_index))?; // Attempt to progress awaiting downloads if request.get_state().is_awaiting_download() { + // Verify the current request has not exceeded the maximum number of attempts. + let request_state = request.get_state(); + if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { + let cannot_process = request_state.more_failed_processing_attempts(); + return Err(LookupError::TooManyAttempts { cannot_process }); + } + let downloaded_block_expected_blobs = self .block_request_state .state @@ -185,7 +214,8 @@ impl SingleBlockLookup { }); let peer_id = self.get_rand_available_peer().ok_or(LookupError::NoPeers)?; - let request = R::request_state_mut(self); + let request = R::request_state_mut(self, component_index) + .ok_or(LookupError::UnknownComponentIndex(component_index))?; // Verify the current request has not exceeded the maximum number of attempts. let request_state = request.get_state(); @@ -212,7 +242,7 @@ impl SingleBlockLookup { // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is // useful to conditionally access the result data. if let Some(result) = request.get_state_mut().maybe_start_processing() { - return R::send_for_processing(id, result, cx); + return R::send_for_processing(id, component_index, result, cx); } } @@ -229,7 +259,10 @@ impl SingleBlockLookup { pub fn all_components_processed(&self) -> bool { self.block_request_state.state.is_processed() && self.blob_request_state.state.is_processed() - && self.custody_request_state.state.is_processed() + && self + .custody_columns_requests + .values() + .all(|r| r.state.is_processed()) } /// Remove peer from available peers. Return true if there are no more available peers and all @@ -240,7 +273,10 @@ impl SingleBlockLookup { self.peers.is_empty() && self.block_request_state.state.is_awaiting_download() && self.blob_request_state.state.is_awaiting_download() - && self.custody_request_state.state.is_awaiting_download() + && self + .custody_columns_requests + .values() + .all(|r| r.state.is_awaiting_download()) } /// Selects a random peer from available peers if any, inserts it in used peers and returns it. @@ -265,15 +301,17 @@ impl BlobRequestState { } /// The state of the blob request component of a `SingleBlockLookup`. -pub struct CustodyRequestState { +pub struct CustodyColumnRequestState { pub block_root: Hash256, - pub state: SingleLookupRequestState>>, + pub column_index: ColumnIndex, + pub state: SingleLookupRequestState>, } -impl CustodyRequestState { - pub fn new(block_root: Hash256) -> Self { +impl CustodyColumnRequestState { + pub fn new(block_root: Hash256, column_index: ColumnIndex) -> Self { Self { block_root, + column_index, state: SingleLookupRequestState::new(), } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 322762da1bd..7dfb46336ca 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -608,41 +608,41 @@ impl TestRig { data_columns: Vec>, missing_components: bool, ) { - let lookup_id = if let DataColumnsByRootRequester::Custody(id) = + let DataColumnsByRootRequester::Custody(lookup_id, _) = sampling_ids.first().unwrap().0.requester - { - id.requester.0.lookup_id - } else { + else { panic!("not a custody requester") }; let first_column = data_columns.first().cloned().unwrap(); + let last_index = sampling_ids.len() - 1; - for (id, column_index) in sampling_ids { + for (i, (id, column_index)) in sampling_ids.into_iter().enumerate() { self.log(&format!("return valid data column for {column_index}")); let data_column = data_columns[column_index as usize].clone(); self.complete_data_columns_by_root_request(id, data_column); - } - // Expect work event - // TODO(das): worth it to append sender id to the work event for stricter assertion? - self.expect_rpc_custody_column_work_event(); - - // Respond with valid result - self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type: BlockProcessType::SingleCustodyColumn(lookup_id), - result: if missing_components { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( - first_column.slot(), - first_column.block_root(), - )) - } else { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported( - first_column.block_root(), - )) - }, - }); + // Expect work event + // TODO(das): worth it to append sender id to the work event for stricter assertion? + self.expect_rpc_custody_column_work_event(); + + // Respond with valid result + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type: BlockProcessType::SingleCustodyColumn(lookup_id, column_index), + // Last column + should consider last element + result: if i == last_index && !missing_components { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported( + first_column.block_root(), + )) + } else { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + first_column.slot(), + first_column.block_root(), + )) + }, + }); + } } fn complete_data_columns_by_root_request( @@ -1635,6 +1635,25 @@ fn custody_lookup_happy_path() { r.expect_no_active_lookups(); } +#[test] +fn custody_lookup_no_peers_available_initially() { + let Some(mut r) = TestRig::test_setup_after_peerdas() else { + return; + }; + // Do not add peers initially + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should not request blobs + let id = r.expect_block_lookup_request(block.canonical_root()); + r.complete_valid_block_request(id, block.into(), true); + let custody_column_count = E::min_custody_requirement() * E::data_columns_per_subnet(); + let custody_ids = r.expect_only_data_columns_by_root_requests(block_root, custody_column_count); + r.complete_valid_custody_request(custody_ids, data_columns, false); + r.expect_no_active_lookups(); +} + // TODO(das): Test retries of DataColumnByRoot: // - Expect request for column_index // - Respond with bad data diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index bf8f947ee05..7c6740dd244 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -36,7 +36,8 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; use super::network_context::{ - BlockOrBlob, CustodyId, RangeRequestId, RpcEvent, SyncNetworkContext, + BlockOrBlob, RangeRequestId, RpcByRootRequestError, RpcByRootVerifyError, RpcEvent, + SyncNetworkContext, }; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; @@ -45,13 +46,14 @@ use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProces use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ - BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, + BlobRequestState, BlockComponent, BlockRequestState, CustodyColumnRequestState, DownloadResult, LookupRequestError, }; use crate::sync::block_sidecar_coupling::RangeBlockComponentsRequest; use crate::sync::network_context::PeerGroup; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState, @@ -66,7 +68,9 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{ + BlobSidecar, ColumnIndex, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot, +}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -107,7 +111,7 @@ pub struct DataColumnsByRootRequestId { #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Sampling(SamplingId), - Custody(CustodyId), + Custody(Id, ColumnIndex), } #[derive(Debug)] @@ -188,7 +192,7 @@ pub enum SyncMessage { pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, - SingleCustodyColumn(Id), + SingleCustodyColumn(Id, ColumnIndex), } impl BlockProcessType { @@ -196,7 +200,7 @@ impl BlockProcessType { match self { BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } - | BlockProcessType::SingleCustodyColumn(id) => *id, + | BlockProcessType::SingleCustodyColumn(id, _) => *id, } } } @@ -893,6 +897,7 @@ impl SyncManager { self.block_lookups .on_download_response::>( id, + 0, // component_index = 0, there's a single block resp.map(|(value, seen_timestamp)| { (value, PeerGroup::from_single(peer_id), seen_timestamp) }) @@ -968,6 +973,7 @@ impl SyncManager { self.block_lookups .on_download_response::>( id, + 0, // component_index = 0, there's a single blob request resp.map(|(value, seen_timestamp)| { (value, PeerGroup::from_single(peer_id), seen_timestamp) }) @@ -996,23 +1002,38 @@ impl SyncManager { self.on_sampling_result(requester, result) } } - DataColumnsByRootRequester::Custody(id) => { - if let Some((requester, custody_columns)) = - self.network.on_custody_by_root_response(id, peer_id, resp) - { - // TODO(das): get proper timestamp - let seen_timestamp = timestamp_now(); - self.block_lookups - .on_download_response::>( - requester.0, - custody_columns - .map(|(columns, peer_group)| { - (columns, peer_group, seen_timestamp) - }) - .map_err(LookupRequestError::CustodyRequestError), - &mut self.network, - ); - } + DataColumnsByRootRequester::Custody(lookup_id, column_index) => { + let resp = resp.and_then(|(data_columns, seen_timestamp)| { + let data_column = data_columns.into_iter().next().ok_or( + // TODO(das): block lookup must not allow a request to fail in response to these + // error + // TODO(das): failing peers should be tracked and not re-requested. We + // should hold the custody request for a bit until someone has it. + // Retrying the same peer over and over makes no sense + RpcByRootRequestError::VerifyError( + RpcByRootVerifyError::NoResponseReturned, + ), + )?; + // on_data_columns_by_root_response ensures that peers return either nothing + // or the data requested. block lookups only requests custody columns, so if + // there is one item in this request, it must be a custody column. + let custody_column = CustodyDataColumn::from_asserted_custody(data_column); + Ok((custody_column, seen_timestamp)) + }); + // TODO(das): get proper timestamp + self.block_lookups + .on_download_response::>( + SingleLookupReqId { + lookup_id, + req_id: id.req_id, + }, + column_index as usize, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }) + .map_err(LookupRequestError::CustodyRequestError), + &mut self.network, + ); } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 1437d3f56d9..2b4a4e139dd 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,8 +1,6 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. -pub use self::custody::CustodyId; -use self::custody::{ActiveCustodyRequest, CustodyRequestError, CustodyRequester}; use self::requests::{ ActiveBlobsByRootRequest, ActiveBlocksByRootRequest, ActiveDataColumnsByRootRequest, }; @@ -43,7 +41,6 @@ use types::{ SignedBeaconBlock, Slot, }; -pub mod custody; mod requests; pub struct BlocksAndBlobsByRangeResponse { @@ -102,9 +99,6 @@ impl PeerGroup { pub fn from_single(peer: PeerId) -> Self { Self { peers: vec![peer] } } - pub fn from_set(peers: Vec) -> Self { - Self { peers } - } pub fn all(&self) -> &[PeerId] { &self.peers } @@ -127,8 +121,6 @@ pub struct SyncNetworkContext { DataColumnsByRootRequestId, ActiveDataColumnsByRootRequest, >, - /// Mapping of active custody column requests for a block root - custody_by_root_requests: FnvHashMap>, /// BlocksByRange requests paired with BlobsByRange range_block_components_requests: @@ -180,7 +172,6 @@ impl SyncNetworkContext { blocks_by_root_requests: <_>::default(), blobs_by_root_requests: <_>::default(), data_columns_by_root_requests: <_>::default(), - custody_by_root_requests: <_>::default(), range_block_components_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -544,6 +535,11 @@ impl SyncNetworkContext { let req_id = self.next_id(); let id = DataColumnsByRootRequestId { requester, req_id }; + // TODO(das): Check here if the column is already in the da_checker. Here you can prevent + // re-fetching sampling columns for columns that: + // - Part of custody and already downloaded and verified + // - Part of custody and already imported + debug!( self.log, "Sending DataColumnsByRoot Request"; @@ -566,96 +562,6 @@ impl SyncNetworkContext { Ok(req_id) } - pub fn custody_lookup_request( - &mut self, - lookup_id: SingleLookupId, - block_root: Hash256, - downloaded_block_expected_data: Option, - ) -> Result, &'static str> { - // Check if we are into peerdas - if !self - .chain - .data_availability_checker - .data_columns_required_for_epoch( - // TODO(das): use the block's slot - self.chain - .slot_clock - .now_or_genesis() - .ok_or("clock not available")? - .epoch(T::EthSpec::slots_per_epoch()), - ) - { - return Ok(None); - } - - // Do not download columns until the block is downloaded (or already in the da_checker). - // Then we avoid making requests to peers for blocks that may not have data. If the - // block is not yet downloaded, do nothing. There is at least one future event to - // continue this request. - let Some(expects_data) = downloaded_block_expected_data else { - return Ok(None); - }; - - // No data required for this block - if !expects_data { - return Ok(None); - } - - let custody_indexes_imported = self - .chain - .data_availability_checker - .imported_custody_column_indexes(&block_root) - .unwrap_or_default(); - - // TODO(das): figure out how to pass block.slot if we end up doing rotation - let block_epoch = Epoch::new(0); - let custody_indexes_duty = self.network_globals().custody_columns(block_epoch)?; - - // Include only the blob indexes not yet imported (received through gossip) - let custody_indexes_to_fetch = custody_indexes_duty - .into_iter() - .filter(|index| !custody_indexes_imported.contains(index)) - .collect::>(); - - if custody_indexes_to_fetch.is_empty() { - // No indexes required, do not issue any request - return Ok(None); - } - - let req_id = self.next_id(); - let id = SingleLookupReqId { lookup_id, req_id }; - - debug!( - self.log, - "Starting custody columns request"; - "block_root" => ?block_root, - "indices" => ?custody_indexes_to_fetch, - "id" => ?id - ); - - let requester = CustodyRequester(id); - let mut request = ActiveCustodyRequest::new( - block_root, - requester, - custody_indexes_to_fetch, - self.log.clone(), - ); - - // TODO(das): start request - // Note that you can only send, but not handle a response here - match request.continue_requests(self) { - Ok(_) => { - // Ignoring the result of `continue_requests` is okay. A request that has just been - // created cannot return data immediately, it must send some request to the network - // first. And there must exist some request, `custody_indexes_to_fetch` is not empty. - self.custody_by_root_requests.insert(requester, request); - Ok(Some(req_id)) - } - // TODO(das): handle this error properly - Err(_) => Err("custody_send_error"), - } - } - pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -907,54 +813,6 @@ impl SyncNetworkContext { Some((requester, resp)) } - /// Insert a downloaded column into an active custody request. Then make progress on the - /// entire request. - /// - /// ### Returns - /// - /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. - /// - `None`: Request still active, requester should do no action - #[allow(clippy::type_complexity)] - pub fn on_custody_by_root_response( - &mut self, - id: CustodyId, - peer_id: PeerId, - resp: RpcByRootRequestResult>>>, - ) -> Option<( - CustodyRequester, - Result<(Vec>, PeerGroup), CustodyRequestError>, - )> { - // Note: need to remove the request to borrow self again below. Otherwise we can't - // do nested requests - let Some(mut request) = self.custody_by_root_requests.remove(&id.requester) else { - // TOOD(das): This log can happen if the request is error'ed early and dropped - debug!(self.log, "Custody column downloaded event for unknown request"; "id" => ?id); - return None; - }; - - let result = request - .on_data_column_downloaded(peer_id, id.column_index, resp, self) - .transpose(); - - // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to - // an Option first to use in an `if let Some() { act on result }` block. - if let Some(result) = result { - match result.as_ref() { - Ok((columns, peer_group)) => { - debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group) - } - Err(e) => { - debug!(self.log, "Custody request failure, removing"; "id" => ?id, "error" => ?e) - } - } - - Some((id.requester, result)) - } else { - self.custody_by_root_requests.insert(id.requester, request); - None - } - } - pub fn send_block_for_processing( &self, block_root: Hash256, diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs deleted file mode 100644 index 0d0288d9fe8..00000000000 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ /dev/null @@ -1,287 +0,0 @@ -use crate::sync::manager::SingleLookupReqId; - -use self::request::ActiveColumnSampleRequest; -use beacon_chain::data_column_verification::CustodyDataColumn; -use beacon_chain::BeaconChainTypes; -use fnv::FnvHashMap; -use lighthouse_network::PeerId; -use slog::{debug, warn}; -use std::{marker::PhantomData, sync::Arc}; -use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Epoch, Hash256}; - -use super::{PeerGroup, RpcByRootRequestResult, SyncNetworkContext}; - -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct CustodyId { - pub requester: CustodyRequester, - pub column_index: ColumnIndex, -} - -/// Downstream components that perform custody by root requests. -/// Currently, it's only single block lookups, so not using an enum -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct CustodyRequester(pub SingleLookupReqId); - -type DataColumnSidecarList = Vec>>; - -pub struct ActiveCustodyRequest { - block_root: Hash256, - block_epoch: Epoch, - requester_id: CustodyRequester, - column_requests: FnvHashMap, - columns: Vec>, - /// Logger for the `SyncNetworkContext`. - pub log: slog::Logger, - _phantom: PhantomData, -} - -#[derive(Debug)] -pub enum CustodyRequestError { - SendFailed(&'static str), - TooManyFailures, - BadState(String), - NoPeers(ColumnIndex), -} - -type CustodyRequestResult = - Result>, PeerGroup)>, CustodyRequestError>; - -impl ActiveCustodyRequest { - pub(crate) fn new( - block_root: Hash256, - requester_id: CustodyRequester, - column_indexes: Vec, - log: slog::Logger, - ) -> Self { - Self { - block_root, - // TODO(das): use actual epoch if there's rotation - block_epoch: Epoch::new(0), - requester_id, - column_requests: column_indexes - .into_iter() - .map(|index| (index, ActiveColumnSampleRequest::new(index))) - .collect(), - columns: vec![], - log, - _phantom: PhantomData, - } - } - - /// Insert a downloaded column into an active sampling request. Then make progress on the - /// entire request. - /// - /// ### Returns - /// - /// - `Err`: Sampling request has failed and will be dropped - /// - `Ok(Some)`: Sampling request has successfully completed and will be dropped - /// - `Ok(None)`: Sampling request still active - pub(crate) fn on_data_column_downloaded( - &mut self, - _peer_id: PeerId, - column_index: ColumnIndex, - resp: RpcByRootRequestResult>, - cx: &mut SyncNetworkContext, - ) -> CustodyRequestResult { - // TODO(das): Should downscore peers for verify errors here - - let Some(request) = self.column_requests.get_mut(&column_index) else { - warn!( - self.log, - "Received sampling response for unrequested column index" - ); - return Ok(None); - }; - - match resp { - Ok((mut data_columns, _seen_timestamp)) => { - debug!(self.log, "Sample download success"; "block_root" => %self.block_root, "column_index" => column_index, "count" => data_columns.len()); - - // No need to check data_columns has len > 1, as the SyncNetworkContext ensure that - // only requested is returned (or none); - if let Some(data_column) = data_columns.pop() { - request.on_download_success()?; - - // If on_download_success is successful, we are expecting a columna for this - // custody requirement. - self.columns - .push(CustodyDataColumn::from_asserted_custody(data_column)); - } else { - // Peer does not have the requested data. - // TODO(das) what to do? - // TODO(das): If the peer is in the lookup peer set it claims to have imported - // the block AND its custody columns. So in this case we can downscore - debug!(self.log, "Sampling peer claims to not have the data"; "block_root" => %self.block_root, "column_index" => column_index); - // TODO(das) tolerate this failure if you are not sure the block has data - request.on_download_success()?; - } - } - Err(err) => { - debug!(self.log, "Sample download error"; "block_root" => %self.block_root, "column_index" => column_index, "error" => ?err); - - // Error downloading, maybe penalize peer and retry again. - // TODO(das) with different peer or different peer? - request.on_download_error()?; - } - }; - - self.continue_requests(cx) - } - - pub(crate) fn continue_requests( - &mut self, - cx: &mut SyncNetworkContext, - ) -> CustodyRequestResult { - // First check if sampling is completed, by computing `required_successes` - let mut successes = 0; - - for request in self.column_requests.values() { - if request.is_downloaded() { - successes += 1; - } - } - - // All requests have completed successfully. We may not have all the expected columns if the - // serving peers claim that this block has no data. - if successes == self.column_requests.len() { - let columns = std::mem::take(&mut self.columns); - - let peers = self - .column_requests - .values() - .filter_map(|r| r.peer()) - .collect::>(); - let peer_group = PeerGroup::from_set(peers); - - return Ok(Some((columns, peer_group))); - } - - for (_, request) in self.column_requests.iter_mut() { - request.request(self.block_root, self.block_epoch, self.requester_id, cx)?; - } - - Ok(None) - } -} - -mod request { - use super::{CustodyId, CustodyRequestError, CustodyRequester}; - use crate::sync::{ - manager::DataColumnsByRootRequester, - network_context::{DataColumnsByRootSingleBlockRequest, SyncNetworkContext}, - }; - use beacon_chain::BeaconChainTypes; - use lighthouse_network::PeerId; - use types::{data_column_sidecar::ColumnIndex, Epoch, Hash256}; - - /// TODO(das): this attempt count is nested into the existing lookup request count. - const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS: usize = 3; - - pub(crate) struct ActiveColumnSampleRequest { - column_index: ColumnIndex, - status: Status, - download_failures: usize, - } - - #[derive(Debug, Clone)] - enum Status { - NotStarted, - Downloading(PeerId), - Downloaded(PeerId), - } - - impl ActiveColumnSampleRequest { - pub(crate) fn new(column_index: ColumnIndex) -> Self { - Self { - column_index, - status: Status::NotStarted, - download_failures: 0, - } - } - - pub(crate) fn is_downloaded(&self) -> bool { - match self.status { - Status::NotStarted | Status::Downloading(_) => false, - Status::Downloaded(_) => true, - } - } - - pub(crate) fn peer(&self) -> Option { - match self.status { - Status::NotStarted | Status::Downloading(_) => None, - Status::Downloaded(peer) => Some(peer), - } - } - - pub(crate) fn request( - &mut self, - block_root: Hash256, - block_epoch: Epoch, - requester: CustodyRequester, - cx: &mut SyncNetworkContext, - ) -> Result { - match &self.status { - Status::NotStarted => {} // Ok to continue - Status::Downloading(_) => return Ok(false), // Already downloading - Status::Downloaded(_) => return Ok(false), // Already completed - } - - if self.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS { - return Err(CustodyRequestError::TooManyFailures); - } - - // TODO: When is a fork and only a subset of your peers know about a block, sampling should only - // be queried on the peers on that fork. Should this case be handled? How to handle it? - let peer_ids = cx.get_custodial_peers(block_epoch, self.column_index); - - // TODO(das) randomize custodial peer and avoid failing peers - let Some(peer_id) = peer_ids.first().cloned() else { - // Do not tolerate not having custody peers, hard error. - // TODO(das): we might implement some grace period. The request will pause for X - // seconds expecting the peer manager to find peers before failing the request. - return Err(CustodyRequestError::NoPeers(self.column_index)); - }; - - cx.data_column_lookup_request( - DataColumnsByRootRequester::Custody(CustodyId { - requester, - column_index: self.column_index, - }), - peer_id, - DataColumnsByRootSingleBlockRequest { - block_root, - indices: vec![self.column_index], - }, - ) - .map_err(CustodyRequestError::SendFailed)?; - - self.status = Status::Downloading(peer_id); - Ok(true) - } - - pub(crate) fn on_download_error(&mut self) -> Result { - match self.status.clone() { - Status::Downloading(peer_id) => { - self.download_failures += 1; - self.status = Status::NotStarted; - Ok(peer_id) - } - other => Err(CustodyRequestError::BadState(format!( - "bad state on_sampling_error expected Sampling got {other:?}" - ))), - } - } - - pub(crate) fn on_download_success(&mut self) -> Result<(), CustodyRequestError> { - match &self.status { - Status::Downloading(peer) => { - self.status = Status::Downloaded(*peer); - Ok(()) - } - other => Err(CustodyRequestError::BadState(format!( - "bad state on_sampling_success expected Sampling got {other:?}" - ))), - } - } - } -} From bb2e27f1ec7d7f367c102cba42f205d496e99754 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 13 May 2024 06:37:46 +0300 Subject: [PATCH 5/5] Add supernode peer and reconstruction test --- .../src/peer_manager/peerdb.rs | 20 +++++- .../lighthouse_network/src/types/globals.rs | 18 +++++ .../sync/block_lookups/single_block_lookup.rs | 24 ++++--- .../network/src/sync/block_lookups/tests.rs | 71 ++++++++++++++++--- 4 files changed, 113 insertions(+), 20 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index c3e77ae225e..76dbd483b57 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,9 +1,11 @@ +use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::discovery::CombinedKey; use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId}; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; use score::{PeerAction, ReportSource, Score, ScoreState}; use slog::{crit, debug, error, trace, warn}; +use ssz::Encode; use std::net::IpAddr; use std::time::Instant; use std::{cmp::Ordering, fmt::Display}; @@ -673,9 +675,23 @@ impl PeerDB { } /// Updates the connection state. MUST ONLY BE USED IN TESTS. - pub fn __add_connected_peer_testing_only(&mut self, peer_id: &PeerId) -> Option { + pub fn __add_connected_peer_testing_only( + &mut self, + peer_id: &PeerId, + supernode: bool, + ) -> Option { let enr_key = CombinedKey::generate_secp256k1(); - let enr = Enr::builder().build(&enr_key).unwrap(); + let mut enr = Enr::builder().build(&enr_key).unwrap(); + + if supernode { + enr.insert( + PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, + &(E::data_column_subnet_count() as u64).as_ssz_bytes(), + &enr_key, + ) + .expect("u64 can be encoded"); + } + self.update_connection_state( peer_id, NewConnectionState::Connected { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 5dd74f25726..de4ef7785fd 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -1,4 +1,5 @@ //! A collection of variables that are accessible outside of the network thread itself. +use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV2}; use crate::types::{BackFillState, SyncState}; @@ -6,6 +7,7 @@ use crate::EnrExt; use crate::{Client, Eth2Enr}; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; +use ssz::Encode; use std::collections::HashSet; use types::data_column_sidecar::ColumnIndex; use types::{DataColumnSubnetId, Epoch, EthSpec}; @@ -141,6 +143,22 @@ impl NetworkGlobals { log, ) } + + /// TESTING ONLY. Set a custody_subnet_count value + pub fn test_mutate_custody_subnet_count(&mut self, value: u64) { + use crate::CombinedKeyExt; + // For test: use a random key. WARNING: changes ENR NodeID + let keypair = libp2p::identity::secp256k1::Keypair::generate(); + let enr_key = discv5::enr::CombinedKey::from_secp256k1(&keypair); + self.local_enr + .write() + .insert( + PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, + &value.as_ssz_bytes(), + &enr_key, + ) + .expect("u64 can be serialized"); + } } #[cfg(test)] diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index ff1613f9e0b..526936f00fd 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -74,19 +74,25 @@ impl SingleBlockLookup { awaiting_parent: Option, custody_column_indexes: Vec, ) -> Self { + let column_count = custody_column_indexes.len(); + let custody_columns_requests = custody_column_indexes + .into_iter() + .map(|column_index| { + let request = CustodyColumnRequestState::new(block_root, column_index); + (column_index, request) + }) + .collect::>(); + debug_assert_eq!( + column_count, + custody_columns_requests.len(), + "duplicate column indexes" + ); + Self { id, block_request_state: BlockRequestState::new(block_root), blob_request_state: BlobRequestState::new(block_root), - custody_columns_requests: custody_column_indexes - .into_iter() - .map(|column_index| { - ( - column_index, - CustodyColumnRequestState::new(block_root, column_index), - ) - }) - .collect(), + custody_columns_requests, block_root, awaiting_parent, peers: peers.iter().copied().collect(), diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 7dfb46336ca..f6b1deb0672 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -89,6 +89,7 @@ type SamplingIds = Vec<(DataColumnsByRootRequestId, ColumnIndex)>; struct TestRigConfig { peer_das_enabled: bool, + supernode: bool, } impl TestRig { @@ -99,7 +100,7 @@ impl TestRig { // Use `fork_from_env` logic to set correct fork epochs let mut spec = test_spec::(); - if let Some(config) = config { + if let Some(config) = &config { if config.peer_das_enabled { spec.eip7594_fork_epoch = Some(Epoch::new(0)); } @@ -123,9 +124,15 @@ impl TestRig { let (network_tx, network_rx) = mpsc::unbounded_channel(); // TODO(das): make the generation of the ENR use the deterministic rng to have consistent // column assignments - let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log)); + let mut globals = NetworkGlobals::new_test_globals(Vec::new(), &log); + if let Some(config) = &config { + if config.supernode { + globals.test_mutate_custody_subnet_count(E::data_column_subnet_count() as u64); + } + } + let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( - globals, + Arc::new(globals), chain.clone(), harness.runtime.task_executor.clone(), log.clone(), @@ -179,6 +186,19 @@ impl TestRig { fn test_setup_after_peerdas() -> Option { let r = Self::test_setup_with_config(Some(TestRigConfig { peer_das_enabled: true, + supernode: false, + })); + if r.after_deneb() { + Some(r) + } else { + None + } + } + + fn test_setup_after_peerdas_supernode() -> Option { + let r = Self::test_setup_with_config(Some(TestRigConfig { + peer_das_enabled: true, + supernode: true, })); if r.after_deneb() { Some(r) @@ -356,12 +376,26 @@ impl TestRig { self.network_globals .peers .write() - .__add_connected_peer_testing_only(&peer_id); + .__add_connected_peer_testing_only(&peer_id, false); + peer_id + } + + fn new_connected_supernode_peer(&mut self) -> PeerId { + let peer_id = PeerId::random(); + self.network_globals + .peers + .write() + .__add_connected_peer_testing_only(&peer_id, true); peer_id } - fn new_connected_peers(&mut self, count: usize) -> Vec { - (0..count).map(|_| self.new_connected_peer()).collect() + fn new_connected_peers_for_peerdas(&mut self) { + // Enough sampling peers with few columns + for _ in 0..100 { + self.new_connected_peer(); + } + // One supernode peer to ensure all columns have at least one peer + self.new_connected_supernode_peer(); } fn parent_chain_processed_success( @@ -1584,7 +1618,7 @@ fn sampling_happy_path() { let Some(mut r) = TestRig::test_setup_after_peerdas() else { return; }; - r.new_connected_peers(100); // Add enough sampling peers + r.new_connected_peers_for_peerdas(); let (block, data_columns) = r.rand_block_and_data_columns(); let block_root = block.canonical_root(); r.trigger_sample_block(block_root, block.slot()); @@ -1601,7 +1635,7 @@ fn sampling_with_retries() { let Some(mut r) = TestRig::test_setup_after_peerdas() else { return; }; - r.new_connected_peers(100); // Add enough sampling peers + r.new_connected_peers_for_peerdas(); let (block, data_columns) = r.rand_block_and_data_columns(); let block_root = block.canonical_root(); r.trigger_sample_block(block_root, block.slot()); @@ -1621,7 +1655,7 @@ fn custody_lookup_happy_path() { let Some(mut r) = TestRig::test_setup_after_peerdas() else { return; }; - r.new_connected_peers(100); // Add enough sampling peers + r.new_connected_peers_for_peerdas(); let (block, data_columns) = r.rand_block_and_data_columns(); let block_root = block.canonical_root(); let peer_id = r.new_connected_peer(); @@ -1654,6 +1688,25 @@ fn custody_lookup_no_peers_available_initially() { r.expect_no_active_lookups(); } +#[test] +fn custody_lookup_reconstruction() { + let Some(mut r) = TestRig::test_setup_after_peerdas_supernode() else { + return; + }; + r.new_connected_peers_for_peerdas(); + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should not request blobs + let id = r.expect_block_lookup_request(block.canonical_root()); + r.complete_valid_block_request(id, block.into(), true); + let custody_column_count = E::data_column_subnet_count() * E::data_columns_per_subnet(); + let custody_ids = r.expect_only_data_columns_by_root_requests(block_root, custody_column_count); + r.complete_valid_custody_request(custody_ids, data_columns, false); + r.expect_no_active_lookups(); +} + // TODO(das): Test retries of DataColumnByRoot: // - Expect request for column_index // - Respond with bad data