From c85c205e00e8c84fb0080699bc33101de0349b18 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 30 Apr 2024 16:18:07 +0900 Subject: [PATCH] Re-process early sampling requests (#5569) * Re-process early sampling requests # Conflicts: # beacon_node/beacon_processor/src/work_reprocessing_queue.rs # beacon_node/lighthouse_network/src/rpc/methods.rs # beacon_node/network/src/network_beacon_processor/rpc_methods.rs * Update beacon_node/beacon_processor/src/work_reprocessing_queue.rs Co-authored-by: Jimmy Chen * Add missing var * Beta compiler fixes and small typo fixes. * Remove duplicate method. --------- Co-authored-by: Jimmy Chen --- beacon_node/beacon_chain/src/beacon_chain.rs | 68 +++++++- .../beacon_chain/src/block_verification.rs | 6 +- beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/beacon_processor/src/lib.rs | 48 ++++-- beacon_node/beacon_processor/src/metrics.rs | 7 +- .../src/work_reprocessing_queue.rs | 102 ++++++++++++ .../lighthouse_network/src/rpc/methods.rs | 14 +- .../src/network_beacon_processor/mod.rs | 11 +- .../network_beacon_processor/rpc_methods.rs | 154 +++++++++++------- beacon_node/network/src/sync/sampling.rs | 4 +- 10 files changed, 332 insertions(+), 83 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 40bb103094f..093822dee69 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -122,7 +122,7 @@ use task_executor::{ShutdownReason, TaskExecutor}; use tokio_stream::Stream; use tree_hash::TreeHash; use types::blob_sidecar::FixedBlobSidecarList; -use types::data_column_sidecar::DataColumnSidecarList; +use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier, DataColumnSidecarList}; use types::payload::BlockProductionVersion; use types::*; @@ -541,6 +541,12 @@ pub struct BeaconBlockResponse> { pub consensus_block_value: u64, } +pub enum BlockImportStatus { + PendingImport(Arc>), + Imported, + Unknown, +} + impl FinalizationAndCanonicity { pub fn is_finalized(self) -> bool { self.slot_is_finalized && self.canonical @@ -1166,6 +1172,66 @@ impl BeaconChain { .map_or_else(|| self.get_data_columns(block_root), Ok) } + pub fn get_selected_data_columns_checking_all_caches( + &self, + block_root: Hash256, + indices: &[ColumnIndex], + ) -> Result>>, Error> { + let columns_from_availability_cache = indices + .iter() + .copied() + .filter_map(|index| { + self.data_availability_checker + .get_data_column(&DataColumnIdentifier { block_root, index }) + .transpose() + }) + .collect::, _>>()?; + // Existence of a column in the data availability cache and downstream caches is exclusive. + // If there's a single match in the availability cache we can safely skip other sources. + if !columns_from_availability_cache.is_empty() { + return Ok(columns_from_availability_cache); + } + + Ok(self + .early_attester_cache + .get_data_columns(block_root) + .map_or_else(|| self.get_data_columns(&block_root), Ok)? + .into_iter() + .filter(|dc| indices.contains(&dc.index)) + .collect()) + } + + /// Returns the import status of block checking (in order) pre-import caches, fork-choice, db store + pub fn get_block_import_status(&self, block_root: &Hash256) -> BlockImportStatus { + if let Some(block) = self + .reqresp_pre_import_cache + .read() + .get(block_root) + .map(|block| { + metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS); + block.clone() + }) + { + return BlockImportStatus::PendingImport(block); + } + // Check fork-choice before early_attester_cache as the latter is pruned lazily + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(block_root) + { + return BlockImportStatus::Imported; + } + if let Some(block) = self.early_attester_cache.get_block(*block_root) { + return BlockImportStatus::PendingImport(block); + } + if let Ok(true) = self.store.block_exists(block_root) { + BlockImportStatus::Imported + } else { + BlockImportStatus::Unknown + } + } + /// Returns the block at the given root, if any. /// /// ## Errors diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 8afaf1b1ecb..3a0b4daf8ea 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -98,7 +98,11 @@ use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use task_executor::JoinHandle; use tree_hash::TreeHash; use types::data_column_sidecar::DataColumnSidecarError; -use types::{BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot}; +use types::{ + BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecarList, ChainSpec, DataColumnSidecar, + DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork, + PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, +}; use types::{BlobSidecar, ExecPayload}; pub const POS_PANDA_BANNER: &str = r#" diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index e4c96e5e8b5..2fcde392fdc 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -73,6 +73,7 @@ pub use self::chain_config::ChainConfig; pub use self::errors::{BeaconChainError, BlockProductionError}; pub use self::historical_blocks::HistoricalBlockError; pub use attestation_verification::Error as AttestationError; +pub use beacon_chain::BlockImportStatus; pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; pub use block_verification::{ get_block_root, BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock, diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 118a043addc..3eebf5718a1 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -62,11 +62,11 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use types::{Attestation, Hash256, SignedAggregateAndProof, SubnetId}; use types::{EthSpec, Slot}; -use work_reprocessing_queue::IgnoredRpcBlock; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; +use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest}; mod metrics; pub mod work_reprocessing_queue; @@ -141,6 +141,10 @@ const MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN: usize = 1_024; /// for reprocessing before we start dropping them. const MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN: usize = 128; +/// The maximum number of queued retries of ReqResp sampling requests from the reprocess queue that +/// will be stored before dropping them. +const MAX_UNKNOWN_BLOCK_SAMPLING_REQUEST_QUEUE_LEN: usize = 16_384; + /// The maximum number of queued `SyncCommitteeMessage` objects that will be stored before we start dropping /// them. const MAX_SYNC_MESSAGE_QUEUE_LEN: usize = 2048; @@ -272,6 +276,7 @@ pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST: &str = "light_client_optimisti pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update"; +pub const UNKNOWN_BLOCK_SAMPLING_REQUEST: &str = "unknown_block_sampling_request"; pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change"; pub const API_REQUEST_P0: &str = "api_request_p0"; pub const API_REQUEST_P1: &str = "api_request_p1"; @@ -527,6 +532,10 @@ impl From for WorkEvent { process_fn, }, }, + ReadyWork::SamplingRequest(QueuedSamplingRequest { process_fn, .. }) => Self { + drop_during_sync: true, + work: Work::UnknownBlockSamplingRequest { process_fn }, + }, ReadyWork::BackfillSync(QueuedBackfillBatch(process_fn)) => Self { drop_during_sync: false, work: Work::ChainSegmentBackfill(process_fn), @@ -610,6 +619,9 @@ pub enum Work { parent_root: Hash256, process_fn: BlockingFn, }, + UnknownBlockSamplingRequest { + process_fn: BlockingFn, + }, GossipAggregateBatch { aggregates: Vec>, process_batch: Box>) + Send + Sync>, @@ -699,8 +711,9 @@ impl Work { Work::LightClientFinalityUpdateRequest(_) => LIGHT_CLIENT_FINALITY_UPDATE_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, - Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE, Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE, + Work::UnknownBlockSamplingRequest { .. } => UNKNOWN_BLOCK_SAMPLING_REQUEST, + Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE, Work::ApiRequestP0 { .. } => API_REQUEST_P0, Work::ApiRequestP1 { .. } => API_REQUEST_P1, } @@ -839,6 +852,8 @@ impl BeaconProcessor { let mut optimistic_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_QUEUE_LEN); let mut unknown_light_client_update_queue = FifoQueue::new(MAX_GOSSIP_OPTIMISTIC_UPDATE_REPROCESS_QUEUE_LEN); + let mut unknown_block_sampling_request_queue = + FifoQueue::new(MAX_UNKNOWN_BLOCK_SAMPLING_REQUEST_QUEUE_LEN); // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); @@ -1158,16 +1173,22 @@ impl BeaconProcessor { // and BlocksByRoot) } else if let Some(item) = status_queue.pop() { self.spawn_worker(item, idle_tx); - } else if let Some(item) = bbrange_queue.pop() { - self.spawn_worker(item, idle_tx); + // Prioritize by_root requests over by_range as the former are time + // sensitive for recovery } else if let Some(item) = bbroots_queue.pop() { self.spawn_worker(item, idle_tx); - } else if let Some(item) = blbrange_queue.pop() { - self.spawn_worker(item, idle_tx); } else if let Some(item) = blbroots_queue.pop() { self.spawn_worker(item, idle_tx); } else if let Some(item) = dcbroots_queue.pop() { self.spawn_worker(item, idle_tx); + // Prioritize sampling requests after block syncing requests + } else if let Some(item) = unknown_block_sampling_request_queue.pop() { + self.spawn_worker(item, idle_tx); + // by_range sync after sampling + } else if let Some(item) = bbrange_queue.pop() { + self.spawn_worker(item, idle_tx); + } else if let Some(item) = blbrange_queue.pop() { + self.spawn_worker(item, idle_tx); // Check slashings after all other consensus messages so we prioritize // following head. // @@ -1344,6 +1365,9 @@ impl BeaconProcessor { Work::UnknownLightClientOptimisticUpdate { .. } => { unknown_light_client_update_queue.push(work, work_id, &self.log) } + Work::UnknownBlockSamplingRequest { .. } => { + unknown_block_sampling_request_queue.push(work, work_id, &self.log) + } Work::ApiRequestP0 { .. } => { api_request_p0_queue.push(work, work_id, &self.log) } @@ -1530,12 +1554,12 @@ impl BeaconProcessor { Work::ChainSegment(process_fn) => task_spawner.spawn_async(async move { process_fn.await; }), - Work::UnknownBlockAttestation { process_fn } => task_spawner.spawn_blocking(process_fn), - Work::UnknownBlockAggregate { process_fn } => task_spawner.spawn_blocking(process_fn), - Work::UnknownLightClientOptimisticUpdate { - parent_root: _, - process_fn, - } => task_spawner.spawn_blocking(process_fn), + Work::UnknownBlockAttestation { process_fn } + | Work::UnknownBlockAggregate { process_fn } + | Work::UnknownLightClientOptimisticUpdate { process_fn, .. } + | Work::UnknownBlockSamplingRequest { process_fn } => { + task_spawner.spawn_blocking(process_fn) + } Work::DelayedImportBlock { beacon_block_slot: _, beacon_block_root: _, diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index 503d29dd699..f4c9d61e7d9 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -162,7 +162,12 @@ lazy_static::lazy_static! { ); pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_OPTIMISTIC_UPDATES: Result = try_create_int_counter( "beacon_processor_reprocessing_queue_matched_optimistic_updates", - "Number of queued light client optimistic updates where as matching block has been imported." + "Number of queued light client optimistic updates where a matching block has been imported." + ); + // TODO: This should be labeled instead of N single metrics + pub static ref BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_SAMPLING_REQUESTS: Result = try_create_int_counter( + "beacon_processor_reprocessing_queue_matches_sampling_requests", + "Number of queued sampling requests where a matching block has been imported." ); /// Errors and Debugging Stats diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs index 496fa683d2c..b7f6249714d 100644 --- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs @@ -51,6 +51,9 @@ pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12); /// For how long to queue rpc blocks before sending them back for reprocessing. pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4); +/// For how long to queue sampling requests for reprocessing. +pub const QUEUED_SAMPLING_REQUESTS_DELAY: Duration = Duration::from_secs(12); + /// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that /// we signature-verify blocks before putting them in the queue *should* protect against this, but /// it's nice to have extra protection. @@ -62,6 +65,10 @@ const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; /// How many light client updates we keep before new ones get dropped. const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128; +/// How many sampling requests we queue before new ones get dropped. +/// TODO(das): choose a sensible value +const MAXIMUM_QUEUED_SAMPLING_REQUESTS: usize = 16_384; + // Process backfill batch 50%, 60%, 80% through each slot. // // Note: use caution to set these fractions in a way that won't cause panic-y @@ -98,6 +105,8 @@ pub enum ReprocessQueueMessage { UnknownBlockAggregate(QueuedAggregate), /// A light client optimistic update that references a parent root that has not been seen as a parent. UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), + /// A sampling request that references an unknown block. + UnknownBlockSamplingRequest(QueuedSamplingRequest), /// A new backfill batch that needs to be scheduled for processing. BackfillSync(QueuedBackfillBatch), } @@ -110,6 +119,7 @@ pub enum ReadyWork { Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), LightClientUpdate(QueuedLightClientUpdate), + SamplingRequest(QueuedSamplingRequest), BackfillSync(QueuedBackfillBatch), } @@ -134,6 +144,12 @@ pub struct QueuedLightClientUpdate { pub process_fn: BlockingFn, } +/// A sampling request for which the corresponding block is not known while processing. +pub struct QueuedSamplingRequest { + pub beacon_block_root: Hash256, + pub process_fn: BlockingFn, +} + /// A block that arrived early and has been queued for later import. pub struct QueuedGossipBlock { pub beacon_block_slot: Slot, @@ -218,6 +234,8 @@ struct ReprocessQueue { attestations_delay_queue: DelayQueue, /// Queue to manage scheduled light client updates. lc_updates_delay_queue: DelayQueue, + /// Queue to manage scheduled sampling requests + sampling_requests_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -232,6 +250,10 @@ struct ReprocessQueue { queued_lc_updates: FnvHashMap, /// Light Client Updates per parent_root. awaiting_lc_updates_per_parent_root: HashMap>, + /// Queued sampling requests. + queued_sampling_requests: FnvHashMap, + /// Sampling requests per block root. + awaiting_sampling_requests_per_block_root: HashMap>, /// Queued backfill batches queued_backfill_batches: Vec, @@ -239,15 +261,18 @@ struct ReprocessQueue { /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, next_lc_update: usize, + next_sampling_request_update: usize, early_block_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, + sampling_request_delay_debounce: TimeLatch, next_backfill_batch_event: Option>>, slot_clock: Arc, } pub type QueuedLightClientUpdateId = usize; +pub type QueuedSamplingRequestId = usize; #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum QueuedAttestationId { @@ -403,19 +428,24 @@ impl ReprocessQueue { rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), + sampling_requests_delay_queue: <_>::default(), queued_gossip_block_roots: HashSet::new(), queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), + queued_sampling_requests: <_>::default(), awaiting_attestations_per_root: HashMap::new(), awaiting_lc_updates_per_parent_root: HashMap::new(), + awaiting_sampling_requests_per_block_root: <_>::default(), queued_backfill_batches: Vec::new(), next_attestation: 0, next_lc_update: 0, + next_sampling_request_update: 0, early_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), + sampling_request_delay_debounce: <_>::default(), next_backfill_batch_event: None, slot_clock, } @@ -639,6 +669,35 @@ impl ReprocessQueue { self.next_lc_update += 1; } + InboundEvent::Msg(UnknownBlockSamplingRequest(queued_sampling_request)) => { + if self.sampling_requests_delay_queue.len() >= MAXIMUM_QUEUED_SAMPLING_REQUESTS { + if self.sampling_request_delay_debounce.elapsed() { + error!( + log, + "Sampling requests delay queue is full"; + "queue_size" => MAXIMUM_QUEUED_SAMPLING_REQUESTS, + ); + } + // Drop the inbound message. + return; + } + + let id: QueuedSamplingRequestId = self.next_sampling_request_update; + self.next_sampling_request_update += 1; + + // Register the delay. + let delay_key = self + .sampling_requests_delay_queue + .insert(id, QUEUED_SAMPLING_REQUESTS_DELAY); + + self.awaiting_sampling_requests_per_block_root + .entry(queued_sampling_request.beacon_block_root) + .or_default() + .push(id); + + self.queued_sampling_requests + .insert(id, (queued_sampling_request, delay_key)); + } InboundEvent::Msg(BlockImported { block_root, parent_root, @@ -700,6 +759,49 @@ impl ReprocessQueue { ); } } + // Unqueue the sampling requests we have for this root, if any. + if let Some(queued_ids) = self + .awaiting_sampling_requests_per_block_root + .remove(&block_root) + { + let mut sent_count = 0; + let mut failed_to_send_count = 0; + + for id in queued_ids { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_SAMPLING_REQUESTS, + ); + + if let Some((queued, delay_key)) = self.queued_sampling_requests.remove(&id) + { + // Remove the delay. + self.sampling_requests_delay_queue.remove(&delay_key); + + // Send the work. + let work = ReadyWork::SamplingRequest(queued); + + if self.ready_work_tx.try_send(work).is_err() { + failed_to_send_count += 1; + } else { + sent_count += 1; + } + } else { + // This should never happen. + error!(log, "Unknown sampling request for block root"; "block_root" => ?block_root, "id" => ?id); + } + } + + if failed_to_send_count > 0 { + error!( + log, + "Ignored scheduled sampling requests for block"; + "hint" => "system may be overloaded", + "block_root" => ?block_root, + "failed_count" => failed_to_send_count, + "sent_count" => sent_count, + ); + } + } } InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => { // Unqueue the light client optimistic updates we have for this root, if any. diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 831e3efc117..18bf4e7dcfc 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -6,6 +6,7 @@ use serde::Serialize; use ssz::Encode; use ssz_derive::{Decode, Encode}; use ssz_types::{typenum::U256, VariableList}; +use std::collections::BTreeMap; use std::fmt::Display; use std::marker::PhantomData; use std::ops::Deref; @@ -13,7 +14,7 @@ use std::sync::Arc; use strum::IntoStaticStr; use superstruct::superstruct; use types::blob_sidecar::BlobIdentifier; -use types::data_column_sidecar::DataColumnIdentifier; +use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; use types::{ blob_sidecar::BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, @@ -386,6 +387,17 @@ impl DataColumnsByRootRequest { ); Self { data_column_ids } } + + pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec)> { + let mut column_indexes_by_block = BTreeMap::>::new(); + for request_id in self.data_column_ids.as_slice() { + column_indexes_by_block + .entry(request_id.block_root) + .or_default() + .push(request_id.index); + } + column_indexes_by_block.into_iter().collect() + } } /* RPC Handling and Grouping */ diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index e9ee06f045f..18965fcd6ef 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -650,8 +650,15 @@ impl NetworkBeaconProcessor { request: DataColumnsByRootRequest, ) -> Result<(), Error> { let processor = self.clone(); - let process_fn = - move || processor.handle_data_columns_by_root_request(peer_id, request_id, request); + let reprocess_tx = processor.reprocess_tx.clone(); + let process_fn = move || { + processor.handle_data_columns_by_root_request( + peer_id, + request_id, + request, + Some(reprocess_tx), + ) + }; self.try_send(BeaconWorkEvent { drop_during_sync: false, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 51baa541465..065a6797e29 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -2,7 +2,10 @@ use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERA use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; -use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped}; +use beacon_chain::{ + BeaconChainError, BeaconChainTypes, BlockImportStatus, HistoricalBlockError, WhenSlotSkipped, +}; +use beacon_processor::work_reprocessing_queue::{QueuedSamplingRequest, ReprocessQueueMessage}; use itertools::process_results; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRootRequest, @@ -13,9 +16,9 @@ use slog::{debug, error, warn}; use slot_clock::SlotClock; use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; +use tokio::sync::mpsc; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; -use types::data_column_sidecar::DataColumnIdentifier; use types::{Epoch, EthSpec, ForkName, Hash256, Slot}; impl NetworkBeaconProcessor { @@ -323,81 +326,106 @@ impl NetworkBeaconProcessor { peer_id: PeerId, request_id: PeerRequestId, request: DataColumnsByRootRequest, + reprocess_tx: Option>, ) { - let Some(requested_root) = request - .data_column_ids - .as_slice() - .first() - .map(|id| id.block_root) - else { - // No data column ids requested. - return; - }; - let requested_indices = request - .data_column_ids - .as_slice() - .iter() - .map(|id| id.index) - .collect::>(); + let column_indexes_by_block = request.group_by_ordered_block_root(); let mut send_data_column_count = 0; - let mut data_column_list_results = HashMap::new(); - for id in request.data_column_ids.as_slice() { - // Attempt to get the data columns from the RPC cache. - if let Ok(Some(data_column)) = self.chain.data_availability_checker.get_data_column(id) + for (block_root, column_ids) in column_indexes_by_block.iter() { + match self + .chain + .get_selected_data_columns_checking_all_caches(*block_root, column_ids) { - self.send_response( - peer_id, - Response::DataColumnsByRoot(Some(data_column)), - request_id, - ); - send_data_column_count += 1; - } else { - let DataColumnIdentifier { - block_root: root, - index, - } = id; - - let data_column_list_result = match data_column_list_results.entry(root) { - Entry::Vacant(entry) => entry.insert( - self.chain - .get_data_columns_checking_early_attester_cache(root), - ), - Entry::Occupied(entry) => entry.into_mut(), - }; + Ok(data_columns) => { + for data_column in data_columns { + send_data_column_count += 1; + self.send_response( + peer_id, + Response::DataColumnsByRoot(Some(data_column)), + request_id, + ); + } + } + Err(e) => { + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + // TODO(das): leak error details to ease debugging + format!("{:?}", e).to_string(), + request_id, + ); + error!(self.log, "Error getting data column"; + "block_root" => ?block_root, + "peer" => %peer_id, + "error" => ?e + ); + return; + } + } + } - match data_column_list_result.as_ref() { - Ok(data_columns_sidecar_list) => { - 'inner: for data_column_sidecar in data_columns_sidecar_list.iter() { - if data_column_sidecar.index == *index { - self.send_response( - peer_id, - Response::DataColumnsByRoot(Some(data_column_sidecar.clone())), - request_id, - ); - send_data_column_count += 1; - break 'inner; - } + let should_reprocess = column_indexes_by_block + .first() + .filter(|_| column_indexes_by_block.len() == 1 && send_data_column_count == 0) + .and_then( + |(block_root, _)| match self.chain.get_block_import_status(block_root) { + BlockImportStatus::PendingImport(block) => { + if block.num_expected_blobs() > 0 { + // Known block not yet imported (still have not received columns) but we + // are certain the block has data. Schedule this request for retry once + // the block is imported + // TODO(das): consider only re-processing for some range of slots + Some(*block_root) + } else { + // We know the block has no data, do not retry + None } } - Err(e) => { - debug!( - self.log, - "Error fetching data column for peer"; - "peer" => %peer_id, - "request_root" => ?root, - "error" => ?e, - ); + BlockImportStatus::Imported => { + // If the block is imported, custody columns should also be imported and be + // returned in the attempts above to retrieve columns. Do not retry + None } - } + BlockImportStatus::Unknown => { + // Two options: + // (a) race condition where peer receives a block before than us and has + // started a sampling request + // (b) peer sent a sampling request for a random root we will never receive + // to fill our reprocessing queue + // TODO(das): high tolerance penalty for requests that never resolve + Some(*block_root) + } + }, + ); + + if let (Some(beacon_block_root), Some(reprocess_tx)) = (should_reprocess, reprocess_tx) { + let processor = self.clone(); + if reprocess_tx + .try_send(ReprocessQueueMessage::UnknownBlockSamplingRequest( + QueuedSamplingRequest { + beacon_block_root, + process_fn: Box::new(move || { + processor.handle_data_columns_by_root_request( + peer_id, request_id, request, + None, // Only allow a single retry + ) + }), + }, + )) + .is_err() + { + error!( + self.log, + "Failed to send UnknownBlockSamplingRequest for re-processing" + ) } } + debug!( self.log, "Received DataColumnsByRoot Request"; "peer" => %peer_id, - "request_root" => %requested_root, - "request_indices" => ?requested_indices, + "request" => ?column_indexes_by_block, "returned" => send_data_column_count ); diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs index 50fb73f9bc6..9ee8616daac 100644 --- a/beacon_node/network/src/sync/sampling.rs +++ b/beacon_node/network/src/sync/sampling.rs @@ -174,10 +174,10 @@ pub struct ActiveSamplingRequest { #[derive(Debug)] pub enum SamplingError { - SendFailed(&'static str), + SendFailed(#[allow(dead_code)] &'static str), ProcessorUnavailable, TooManyFailures, - BadState(String), + BadState(#[allow(dead_code)] String), ColumnIndexOutOfBounds, }