From 012e7e7bfa29dd6da8fbb73b09c2d7d020340705 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:49:23 -0400 Subject: [PATCH] Allow custody by root requests to have no peers (#6417) * Allow custody by root requests to have no peers --- beacon_node/network/src/sync/manager.rs | 56 +++++++++++---- .../network/src/sync/network_context.rs | 68 ++++++++++++++----- .../src/sync/network_context/custody.rs | 50 +++++++------- 3 files changed, 121 insertions(+), 53 deletions(-) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ed91c73d8bf..f1417804849 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,7 +35,9 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext}; +use super::network_context::{ + BlockOrBlob, CustodyByRootResult, RangeRequestId, RpcEvent, SyncNetworkContext, +}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::sampling::{Sampling, SamplingConfig, SamplingResult}; @@ -55,8 +57,8 @@ use beacon_chain::{ use futures::StreamExt; use lighthouse_network::rpc::RPCError; use lighthouse_network::service::api_types::{ - DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingId, SamplingRequester, - SingleLookupReqId, SyncRequestId, + CustodyRequester, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingId, + SamplingRequester, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; @@ -368,6 +370,11 @@ impl SyncManager { } self.update_sync_state(); + + // Try to make progress on custody requests that are waiting for peers + for (id, result) in self.network.continue_custody_by_root_requests() { + self.on_custody_by_root_result(id, result); + } } /// Handles RPC errors related to requests that were emitted from the sync manager. @@ -444,6 +451,16 @@ impl SyncManager { self.update_sync_state(); } + /// Prune stale requests that are waiting for peers + fn prune_requests(&mut self) { + // continue_custody_by_root_requests attempts to make progress on all requests. If some + // exceed the stale duration limit they will fail and return a result. Re-using + // `continue_custody_by_root_requests` is just a convenience to have less code. + for (id, result) in self.network.continue_custody_by_root_requests() { + self.on_custody_by_root_result(id, result); + } + } + /// Updates the syncing state of a peer. /// Return whether the peer should be used for range syncing or not, according to its /// connection status. @@ -624,6 +641,8 @@ impl SyncManager { // unless there is a bug. let mut prune_lookups_interval = tokio::time::interval(Duration::from_secs(15)); + let mut prune_requests = tokio::time::interval(Duration::from_secs(15)); + let mut register_metrics_interval = tokio::time::interval(Duration::from_secs(5)); // process any inbound messages @@ -638,6 +657,9 @@ impl SyncManager { _ = prune_lookups_interval.tick() => { self.block_lookups.prune_lookups(); } + _ = prune_requests.tick() => { + self.prune_requests(); + } _ = register_metrics_interval.tick() => { self.network.register_metrics(); } @@ -1054,26 +1076,32 @@ impl SyncManager { } } DataColumnsByRootRequester::Custody(custody_id) => { - if let Some(custody_columns) = self + if let Some(result) = self .network .on_custody_by_root_response(custody_id, req_id, peer_id, resp) { - // TODO(das): get proper timestamp - let seen_timestamp = timestamp_now(); - self.block_lookups - .on_download_response::>( - custody_id.requester.0, - custody_columns.map(|(columns, peer_group)| { - (columns, peer_group, seen_timestamp) - }), - &mut self.network, - ); + self.on_custody_by_root_result(custody_id.requester, result); } } } } } + fn on_custody_by_root_result( + &mut self, + requester: CustodyRequester, + response: CustodyByRootResult, + ) { + // TODO(das): get proper timestamp + let seen_timestamp = timestamp_now(); + self.block_lookups + .on_download_response::>( + requester.0, + response.map(|(columns, peer_group)| (columns, peer_group, seen_timestamp)), + &mut self.network, + ); + } + fn on_sampling_result(&mut self, requester: SamplingRequester, result: SamplingResult) { // TODO(das): How is a consumer of sampling results? // - Fork-choice for trailing DA diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b9f6d180c13..d6f54178491 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -15,6 +15,7 @@ use crate::sync::block_lookups::SingleLookupId; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; +use custody::CustodyRequestResult; use fnv::FnvHashMap; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; @@ -69,6 +70,8 @@ pub enum RpcEvent { pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; +pub type CustodyByRootResult = Result<(DataColumnSidecarList, PeerGroup), RpcResponseError>; + #[derive(Debug)] pub enum RpcResponseError { RpcError(RPCError), @@ -915,6 +918,32 @@ impl SyncNetworkContext { .insert(id, (sender_id, info)); } + /// Attempt to make progress on all custody_by_root requests. Some request may be stale waiting + /// for custody peers. Returns a Vec of results as zero or more requests may fail in this + /// attempt. + pub fn continue_custody_by_root_requests( + &mut self, + ) -> Vec<(CustodyRequester, CustodyByRootResult)> { + let ids = self + .custody_by_root_requests + .keys() + .copied() + .collect::>(); + + // Need to collect ids and results in separate steps to re-borrow self. + ids.into_iter() + .filter_map(|id| { + let mut request = self + .custody_by_root_requests + .remove(&id) + .expect("key of hashmap"); + let result = request.continue_requests(self); + self.handle_custody_by_root_result(id, request, result) + .map(|result| (id, result)) + }) + .collect() + } + // Request handlers pub fn on_single_block_response( @@ -1069,7 +1098,7 @@ impl SyncNetworkContext { req_id: DataColumnsByRootRequestId, peer_id: PeerId, resp: RpcResponseResult>>>, - ) -> Option, PeerGroup), RpcResponseError>> { + ) -> Option> { // Note: need to remove the request to borrow self again below. Otherwise we can't // do nested requests let Some(mut request) = self.custody_by_root_requests.remove(&id.requester) else { @@ -1078,28 +1107,35 @@ impl SyncNetworkContext { return None; }; - let result = request - .on_data_column_downloaded(peer_id, req_id, resp, self) + let result = request.on_data_column_downloaded(peer_id, req_id, resp, self); + + self.handle_custody_by_root_result(id.requester, request, result) + } + + fn handle_custody_by_root_result( + &mut self, + id: CustodyRequester, + request: ActiveCustodyRequest, + result: CustodyRequestResult, + ) -> Option> { + let result = result .map_err(RpcResponseError::CustodyRequestError) .transpose(); // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to // an Option first to use in an `if let Some() { act on result }` block. - if let Some(result) = result { - match result.as_ref() { - Ok((columns, peer_group)) => { - debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group) - } - Err(e) => { - debug!(self.log, "Custody request failure, removing"; "id" => ?id, "error" => ?e) - } + match result.as_ref() { + Some(Ok((columns, peer_group))) => { + debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group) + } + Some(Err(e)) => { + debug!(self.log, "Custody request failure, removing"; "id" => ?id, "error" => ?e) + } + None => { + self.custody_by_root_requests.insert(id, request); } - - Some(result) - } else { - self.custody_by_root_requests.insert(id.requester, request); - None } + result } pub fn send_block_for_processing( diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index dfe409f043d..6736bfb82f0 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -9,7 +9,7 @@ use lighthouse_network::PeerId; use lru_cache::LRUTimeCache; use rand::Rng; use slog::{debug, warn}; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use types::EthSpec; use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256}; @@ -17,6 +17,7 @@ use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256}; use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext}; const FAILED_PEERS_CACHE_EXPIRY_SECONDS: u64 = 5; +const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30); type DataColumnSidecarList = Vec>>; @@ -56,7 +57,7 @@ struct ActiveBatchColumnsRequest { indices: Vec, } -type CustodyRequestResult = Result, PeerGroup)>, Error>; +pub type CustodyRequestResult = Result, PeerGroup)>, Error>; impl ActiveCustodyRequest { pub(crate) fn new( @@ -221,13 +222,13 @@ impl ActiveCustodyRequest { // - which peer returned what to have PeerGroup attributability for (column_index, request) in self.column_requests.iter_mut() { - if request.is_awaiting_download() { + if let Some(wait_duration) = request.is_awaiting_download() { if request.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS { return Err(Error::TooManyFailures); } - // TODO: When is a fork and only a subset of your peers know about a block, we should only - // query the peers on that fork. Should this case be handled? How to handle it? + // TODO(das): When is a fork and only a subset of your peers know about a block, we should + // only query the peers on that fork. Should this case be handled? How to handle it? let custodial_peers = cx.get_custodial_peers(*column_index); // TODO(das): cache this computation in a OneCell or similar to prevent having to @@ -256,17 +257,20 @@ impl ActiveCustodyRequest { .collect::>(); priorized_peers.sort_unstable(); - let Some((_, _, _, peer_id)) = priorized_peers.first() else { - // Do not tolerate not having custody peers, hard error. - // TODO(das): we might implement some grace period. The request will pause for X - // seconds expecting the peer manager to find peers before failing the request. + if let Some((_, _, _, peer_id)) = priorized_peers.first() { + columns_to_request_by_peer + .entry(*peer_id) + .or_default() + .push(*column_index); + } else if wait_duration > MAX_STALE_NO_PEERS_DURATION { + // Allow to request to sit stale in `NotStarted` state for at most + // `MAX_STALE_NO_PEERS_DURATION`, else error and drop the request. Note that + // lookup will naturally retry when other peers send us attestations for + // descendants of this un-available lookup. return Err(Error::NoPeers(*column_index)); - }; - - columns_to_request_by_peer - .entry(*peer_id) - .or_default() - .push(*column_index); + } else { + // Do not issue requests if there is no custody peer on this column + } } } @@ -315,7 +319,7 @@ struct ColumnRequest { #[derive(Debug, Clone)] enum Status { - NotStarted, + NotStarted(Instant), Downloading(DataColumnsByRootRequestId), Downloaded(PeerId, Arc>), } @@ -323,28 +327,28 @@ enum Status { impl ColumnRequest { fn new() -> Self { Self { - status: Status::NotStarted, + status: Status::NotStarted(Instant::now()), download_failures: 0, } } - fn is_awaiting_download(&self) -> bool { + fn is_awaiting_download(&self) -> Option { match self.status { - Status::NotStarted => true, - Status::Downloading { .. } | Status::Downloaded { .. } => false, + Status::NotStarted(start_time) => Some(start_time.elapsed()), + Status::Downloading { .. } | Status::Downloaded { .. } => None, } } fn is_downloaded(&self) -> bool { match self.status { - Status::NotStarted | Status::Downloading { .. } => false, + Status::NotStarted { .. } | Status::Downloading { .. } => false, Status::Downloaded { .. } => true, } } fn on_download_start(&mut self, req_id: DataColumnsByRootRequestId) -> Result<(), Error> { match &self.status { - Status::NotStarted => { + Status::NotStarted { .. } => { self.status = Status::Downloading(req_id); Ok(()) } @@ -363,7 +367,7 @@ impl ColumnRequest { req_id, }); } - self.status = Status::NotStarted; + self.status = Status::NotStarted(Instant::now()); Ok(()) } other => Err(Error::BadState(format!(