From 2d4712b9feddc900bab015af0603ca4c0c41a004 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 22 Apr 2024 09:24:15 -0400 Subject: [PATCH] Pr 5583 review (#24) * add bad state warn log * add rust docs to new fields in `SyncNetworkContext` * remove timestamp todo * add back lookup verify error * remove TODOs --- .../network/src/sync/block_lookups/mod.rs | 50 +++++++--------- .../network/src/sync/block_lookups/tests.rs | 2 +- .../network/src/sync/network_context.rs | 57 +++++++++++++------ .../src/sync/network_context/requests.rs | 42 +++++++------- 4 files changed, 84 insertions(+), 67 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 998bcc855a5..fa2683fb0f0 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,5 +1,6 @@ use self::single_block_lookup::SingleBlockLookup; use super::manager::BlockProcessingResult; +use super::network_context::{LookupFailure, LookupVerifyError}; use super::BatchProcessResult; use super::{manager::BlockProcessType, network_context::SyncNetworkContext}; use crate::metrics; @@ -20,7 +21,6 @@ pub use common::Lookup; pub use common::Parent; pub use common::RequestState; use fnv::FnvHashMap; -use lighthouse_network::rpc::RPCError; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState}; @@ -604,7 +604,7 @@ impl BlockLookups { // processed. Drop the request without extra penalty } RequestError::BadState(..) => { - // Internal error should never happen + warn!(self.log, "Failed to request parent"; "error" => e.as_static()); } } } @@ -638,10 +638,13 @@ impl BlockLookups { id: SingleLookupReqId, peer_id: &PeerId, cx: &mut SyncNetworkContext, - error: RPCError, + error: LookupFailure, ) { - // Downscore peer even if lookup is not known - self.downscore_on_rpc_error(peer_id, &error, cx); + // Only downscore lookup verify errors. RPC errors are downscored in the network handler. + if let LookupFailure::LookupVerifyError(e) = &error { + // Downscore peer even if lookup is not known + self.downscore_on_rpc_error(peer_id, e, cx); + } let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { debug!(self.log, @@ -671,14 +674,17 @@ impl BlockLookups { id: SingleLookupReqId, peer_id: &PeerId, cx: &mut SyncNetworkContext, - error: RPCError, + error: LookupFailure, ) { - // Downscore peer even if lookup is not known - self.downscore_on_rpc_error(peer_id, &error, cx); + // Only downscore lookup verify errors. RPC errors are downscored in the network handler. + if let LookupFailure::LookupVerifyError(e) = &error { + // Downscore peer even if lookup is not known + self.downscore_on_rpc_error(peer_id, e, cx); + } let log = self.log.clone(); let Some(mut lookup) = self.get_single_lookup::(id) else { - debug!(log, "Error response to dropped lookup"; "error" => ?error); + debug!(log, "Error response to dropped lookup"; "error" => %error); return; }; let block_root = lookup.block_root(); @@ -1322,31 +1328,15 @@ impl BlockLookups { pub fn downscore_on_rpc_error( &self, peer_id: &PeerId, - error: &RPCError, + error: &LookupVerifyError, cx: &SyncNetworkContext, ) { // Note: logging the report event here with the full error display. The log inside // `report_peer` only includes a smaller string, like "invalid_data" - debug!(self.log, "reporting peer for sync lookup error"; "error" => %error); - if let Some(action) = match error { - // Protocol errors are heavily penalized - RPCError::SSZDecodeError(..) - | RPCError::IoError(..) - | RPCError::ErrorResponse(..) - | RPCError::InvalidData(..) - | RPCError::HandlerRejected => Some(PeerAction::LowToleranceError), - // Timing / network errors are less penalized - // TODO: Is IoError a protocol error or network error? - RPCError::StreamTimeout | RPCError::IncompleteStream | RPCError::NegotiationTimeout => { - Some(PeerAction::MidToleranceError) - } - // Not supporting a specific protocol is tolerated. TODO: Are you sure? - RPCError::UnsupportedProtocol => None, - // Our fault, don't penalize peer - RPCError::InternalError(..) | RPCError::Disconnected => None, - } { - cx.report_peer(*peer_id, action, error.into()); - } + let error_str: &'static str = error.into(); + + debug!(self.log, "reporting peer for sync lookup error"; "error" => error_str); + cx.report_peer(*peer_id, PeerAction::LowToleranceError, error_str); } pub fn update_metrics(&self) { diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index fc6ac28bdc7..8e3b35ee5d3 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -15,7 +15,7 @@ use beacon_chain::test_utils::{ build_log, generate_rand_block_and_blobs, BeaconChainHarness, EphemeralHarnessType, NumBlobs, }; use beacon_processor::WorkEvent; -use lighthouse_network::rpc::RPCResponseErrorCode; +use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; use lighthouse_network::types::SyncState; use lighthouse_network::{NetworkGlobals, Request}; use slog::info; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 779dde96ff1..fc91270c1dc 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -17,6 +17,7 @@ use fnv::FnvHashMap; use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; +pub use requests::LookupVerifyError; use slog::{debug, trace, warn}; use std::collections::hash_map::Entry; use std::sync::Arc; @@ -51,7 +52,33 @@ pub enum RpcEvent { RPCError(RPCError), } -pub type RpcProcessingResult = Option>; +pub type RpcProcessingResult = Option>; + +pub enum LookupFailure { + RpcError(RPCError), + LookupVerifyError(LookupVerifyError), +} + +impl std::fmt::Display for LookupFailure { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + LookupFailure::RpcError(e) => write!(f, "RPC Error: {:?}", e), + LookupFailure::LookupVerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), + } + } +} + +impl From for LookupFailure { + fn from(e: RPCError) -> Self { + LookupFailure::RpcError(e) + } +} + +impl From for LookupFailure { + fn from(e: LookupVerifyError) -> Self { + LookupFailure::LookupVerifyError(e) + } +} /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { @@ -61,7 +88,10 @@ pub struct SyncNetworkContext { /// A sequential ID for all RPC requests. request_id: Id, + /// A mapping of active BlocksByRoot requests, including both current slot and parent lookups. blocks_by_root_requests: FnvHashMap, + + /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. blobs_by_root_requests: FnvHashMap>, /// BlocksByRange requests paired with BlobsByRange @@ -439,20 +469,18 @@ impl SyncNetworkContext { Ok(block) => Ok((block, seen_timestamp)), Err(e) => { // The request must be dropped after receiving an error. - // TODO: We could NOT drop the request here, and penalize the peer again if - // sends multiple penalizable chunks after the first invalid. request.remove(); - Err(e) + Err(e.into()) } } } RpcEvent::StreamTermination => match request.remove().terminate() { Ok(_) => return None, - Err(e) => Err(e), + Err(e) => Err(e.into()), }, RpcEvent::RPCError(e) => { request.remove(); - Err(e) + Err(e.into()) } }) } @@ -468,32 +496,27 @@ impl SyncNetworkContext { Some(match blob { RpcEvent::Response(blob, _) => match request.get_mut().add_response(blob) { - // TODO: Should deal only with Vec> Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) .map(|blobs| (blobs, timestamp_now())) - .map_err(RPCError::InvalidData), + .map_err(Into::into), Ok(None) => return None, Err(e) => { request.remove(); - Err(e) + Err(e.into()) } }, RpcEvent::StreamTermination => { // Stream terminator match request.remove().terminate() { - // TODO: Should deal only with Vec> Some(blobs) => to_fixed_blob_sidecar_list(blobs) - // TODO: a seen_timestamp for an array of blobs doesn't make much sense - // since each is received at different times. Should we track first, last or - // average? .map(|blobs| (blobs, timestamp_now())) - .map_err(RPCError::InvalidData), + .map_err(Into::into), None => return None, } } RpcEvent::RPCError(e) => { request.remove(); - Err(e) + Err(e.into()) } }) } @@ -501,13 +524,13 @@ impl SyncNetworkContext { fn to_fixed_blob_sidecar_list( blobs: Vec>>, -) -> Result, String> { +) -> Result, LookupVerifyError> { let mut fixed_list = FixedBlobSidecarList::default(); for blob in blobs.into_iter() { let index = blob.index as usize; *fixed_list .get_mut(index) - .ok_or("invalid index".to_string())? = Some(blob) + .ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob) } Ok(fixed_list) } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 91876bf9c5d..0522b7fa384 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,10 +1,21 @@ use beacon_chain::get_block_root; -use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest, RPCError}; +use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest}; use std::sync::Arc; +use strum::IntoStaticStr; use types::{ blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, }; +#[derive(Debug, PartialEq, Eq, IntoStaticStr)] +pub enum LookupVerifyError { + NoResponseReturned, + TooManyResponses, + UnrequestedBlockRoot(Hash256), + UnrequestedBlobIndex(u64), + InvalidInclusionProof, + DuplicateData, +} + pub struct ActiveBlocksByRootRequest { request: BlocksByRootSingleRequest, resolved: bool, @@ -24,16 +35,14 @@ impl ActiveBlocksByRootRequest { pub fn add_response( &mut self, block: Arc>, - ) -> Result>, RPCError> { + ) -> Result>, LookupVerifyError> { if self.resolved { - return Err(RPCError::InvalidData("too many responses".to_string())); + return Err(LookupVerifyError::TooManyResponses); } let block_root = get_block_root(&block); if self.request.0 != block_root { - return Err(RPCError::InvalidData(format!( - "un-requested block root {block_root:?}" - ))); + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); } // Valid data, blocks by root expects a single response @@ -41,11 +50,11 @@ impl ActiveBlocksByRootRequest { Ok(block) } - pub fn terminate(self) -> Result<(), RPCError> { + pub fn terminate(self) -> Result<(), LookupVerifyError> { if self.resolved { Ok(()) } else { - Err(RPCError::InvalidData("no response returned".to_string())) + Err(LookupVerifyError::NoResponseReturned) } } } @@ -101,28 +110,23 @@ impl ActiveBlobsByRootRequest { pub fn add_response( &mut self, blob: Arc>, - ) -> Result>>>, RPCError> { + ) -> Result>>>, LookupVerifyError> { if self.resolved { - return Err(RPCError::InvalidData("too many responses".to_string())); + return Err(LookupVerifyError::TooManyResponses); } let block_root = blob.block_root(); if self.request.block_root != block_root { - return Err(RPCError::InvalidData(format!( - "un-requested block root {block_root:?}" - ))); + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); } if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - return Err(RPCError::InvalidData("invalid inclusion proof".to_string())); + return Err(LookupVerifyError::InvalidInclusionProof); } if !self.request.indices.contains(&blob.index) { - return Err(RPCError::InvalidData(format!( - "un-requested blob index {}", - blob.index - ))); + return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index)); } if self.blobs.iter().any(|b| b.index == blob.index) { - return Err(RPCError::InvalidData("duplicated data".to_string())); + return Err(LookupVerifyError::DuplicateData); } self.blobs.push(blob);