From 865640d687972fb16b11fd3f60b3132862c68c93 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 13 Apr 2024 10:54:36 +0900 Subject: [PATCH 1/7] by-root-stream-terminator --- .../network/src/sync/block_lookups/common.rs | 157 +++------- .../network/src/sync/block_lookups/mod.rs | 117 ++++---- .../src/sync/block_lookups/parent_lookup.rs | 16 +- .../sync/block_lookups/single_block_lookup.rs | 20 +- beacon_node/network/src/sync/manager.rs | 203 ++++++++----- .../network/src/sync/network_context.rs | 273 +++++++++++++++--- 6 files changed, 463 insertions(+), 323 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 8f7881eea8a..f0b50fb6ac1 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -6,17 +6,17 @@ use crate::sync::block_lookups::{ BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, }; use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId}; -use crate::sync::network_context::SyncNetworkContext; +use crate::sync::network_context::{ + BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, SyncNetworkContext, +}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::ChildComponents; -use beacon_chain::{get_block_root, BeaconChainTypes}; -use lighthouse_network::rpc::methods::BlobsByRootRequest; -use lighthouse_network::rpc::BlocksByRootRequest; -use std::ops::IndexMut; +use beacon_chain::BeaconChainTypes; +use rand::prelude::IteratorRandom; use std::sync::Arc; use std::time::Duration; -use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; -use types::{BlobSidecar, ChainSpec, Hash256, SignedBeaconBlock}; +use types::blob_sidecar::FixedBlobSidecarList; +use types::{Hash256, SignedBeaconBlock}; #[derive(Debug, Copy, Clone)] pub enum ResponseType { @@ -73,9 +73,6 @@ pub trait RequestState { /// The type of the request . type RequestType; - /// A block or blob response. - type ResponseType; - /// The type created after validation. type VerifiedResponseType: Clone; @@ -85,14 +82,11 @@ pub trait RequestState { /* Request building methods */ /// Construct a new request. - fn build_request( - &mut self, - spec: &ChainSpec, - ) -> Result<(PeerId, Self::RequestType), LookupRequestError> { + fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> { // Verify and construct request. self.too_many_attempts()?; let peer = self.get_peer()?; - let request = self.new_request(spec); + let request = self.new_request(); Ok((peer, request)) } @@ -100,7 +94,7 @@ pub trait RequestState { fn build_request_and_send( &mut self, id: Id, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { // Check if request is necessary. if !self.get_state().is_awaiting_download() { @@ -108,7 +102,7 @@ pub trait RequestState { } // Construct request. - let (peer_id, request) = self.build_request(&cx.chain.spec)?; + let (peer_id, request) = self.build_request()?; // Update request state. let req_counter = self.get_state_mut().on_download_start(peer_id); @@ -144,14 +138,14 @@ pub trait RequestState { } /// Initialize `Self::RequestType`. - fn new_request(&self, spec: &ChainSpec) -> Self::RequestType; + fn new_request(&self) -> Self::RequestType; /// Send the request to the network service. fn make_request( id: SingleLookupReqId, peer_id: PeerId, request: Self::RequestType, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError>; /* Response handling methods */ @@ -159,46 +153,23 @@ pub trait RequestState { /// Verify the response is valid based on what we requested. fn verify_response( &mut self, - expected_block_root: Hash256, - peer_id: PeerId, - response: Option, - ) -> Result, LookupVerifyError> { - let result = match *self.get_state().get_state() { - State::AwaitingDownload => Err(LookupVerifyError::ExtraBlocksReturned), - State::Downloading { peer_id: _ } => { - // TODO: We requested a download from Downloading { peer_id }, but the network - // injects a response from a different peer_id. What should we do? The peer_id to - // track for scoring is the one that actually sent the response, not the state's - self.verify_response_inner(expected_block_root, response) + response: Self::VerifiedResponseType, + ) -> Result { + let request_state = self.get_state_mut(); + match request_state.state { + State::AwaitingDownload => { + request_state.register_failure_downloading(); + Err(LookupVerifyError::ExtraBlocksReturned) } - State::Processing { .. } | State::Processed { .. } => match response { + State::Downloading { peer_id } => Ok(response), + State::Processing { peer_id: _ } => { // We sent the block for processing and received an extra block. - Some(_) => Err(LookupVerifyError::ExtraBlocksReturned), - // This is simply the stream termination and we are already processing the block - None => Ok(None), - }, - }; - - match result { - Ok(Some(response)) => { - self.get_state_mut().on_download_success(peer_id); - Ok(Some(response)) - } - Ok(None) => Ok(None), - Err(e) => { - self.get_state_mut().on_download_failure(); - Err(e) + request_state.register_failure_downloading(); + Err(LookupVerifyError::ExtraBlocksReturned) } } } - /// The response verification unique to block or blobs. - fn verify_response_inner( - &mut self, - expected_block_root: Hash256, - response: Option, - ) -> Result, LookupVerifyError>; - /// A getter for the parent root of the response. Returns an `Option` because we won't know /// the blob parent if we don't end up getting any blobs in the response. fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option; @@ -247,49 +218,24 @@ pub trait RequestState { } impl RequestState for BlockRequestState { - type RequestType = BlocksByRootRequest; - type ResponseType = Arc>; + type RequestType = BlocksByRootSingleRequest; type VerifiedResponseType = Arc>; type ReconstructedResponseType = RpcBlock; - fn new_request(&self, spec: &ChainSpec) -> BlocksByRootRequest { - BlocksByRootRequest::new(vec![self.requested_block_root], spec) + fn new_request(&self) -> Self::RequestType { + self.requested_block_root } fn make_request( id: SingleLookupReqId, peer_id: PeerId, request: Self::RequestType, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { cx.block_lookup_request(id, peer_id, request) .map_err(LookupRequestError::SendFailed) } - fn verify_response_inner( - &mut self, - expected_block_root: Hash256, - response: Option, - ) -> Result>>, LookupVerifyError> { - match response { - Some(block) => { - // Compute the block root using this specific function so that we can get timing - // metrics. - let block_root = get_block_root(&block); - if block_root != expected_block_root { - // return an error and drop the block - // NOTE: we take this is as a download failure to prevent counting the - // attempt as a chain failure, but simply a peer failure. - Err(LookupVerifyError::RootMismatch) - } else { - // Return the block for processing. - Ok(Some(block)) - } - } - None => Err(LookupVerifyError::NoBlockReturned), - } - } - fn get_parent_root(verified_response: &Arc>) -> Option { Some(verified_response.parent_root()) } @@ -340,60 +286,27 @@ impl RequestState for BlockRequestState } impl RequestState for BlobRequestState { - type RequestType = BlobsByRootRequest; - type ResponseType = Arc>; + type RequestType = BlobsByRootSingleBlockRequest; type VerifiedResponseType = FixedBlobSidecarList; type ReconstructedResponseType = FixedBlobSidecarList; - fn new_request(&self, spec: &ChainSpec) -> BlobsByRootRequest { - let blob_id_vec: Vec = self.requested_ids.clone().into(); - BlobsByRootRequest::new(blob_id_vec, spec) + fn new_request(&self) -> Self::RequestType { + BlobsByRootSingleBlockRequest { + block_root: self.block_root, + indices: self.requested_ids.indices(), + } } fn make_request( id: SingleLookupReqId, peer_id: PeerId, request: Self::RequestType, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { cx.blob_lookup_request(id, peer_id, request) .map_err(LookupRequestError::SendFailed) } - fn verify_response_inner( - &mut self, - expected_block_root: Hash256, - blob: Option, - ) -> Result>, LookupVerifyError> { - match blob { - Some(blob) => { - let received_id = blob.id(); - - if !self.requested_ids.contains(&received_id) { - return Err(LookupVerifyError::UnrequestedBlobId(received_id)); - } - if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - return Err(LookupVerifyError::InvalidInclusionProof); - } - if blob.block_root() != expected_block_root { - return Err(LookupVerifyError::UnrequestedHeader); - } - - // State should remain downloading until we receive the stream terminator. - self.requested_ids.remove(&received_id); - - // The inclusion proof check above ensures `blob.index` is < MAX_BLOBS_PER_BLOCK - let blob_index = blob.index; - *self.blob_download_queue.index_mut(blob_index as usize) = Some(blob); - Ok(None) - } - None => { - let blobs = std::mem::take(&mut self.blob_download_queue); - Ok(Some(blobs)) - } - } - } - fn get_parent_root(verified_response: &FixedBlobSidecarList) -> Option { verified_response .into_iter() diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 4e1d02d38f4..1bc5d1922a9 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -126,7 +126,7 @@ impl BlockLookups { pub fn trigger_single_lookup( &mut self, mut single_block_lookup: SingleBlockLookup, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let block_root = single_block_lookup.block_root(); match single_block_lookup.request_block_and_blobs(cx) { @@ -312,36 +312,32 @@ impl BlockLookups { &mut self, lookup_id: SingleLookupReqId, peer_id: PeerId, - response: Option, + response: R::VerifiedResponseType, seen_timestamp: Duration, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let id = lookup_id.id; let response_type = R::response_type(); let Some(lookup) = self.get_single_lookup::(lookup_id) else { - if response.is_some() { - // We don't have the ability to cancel in-flight RPC requests. So this can happen - // if we started this RPC request, and later saw the block/blobs via gossip. - debug!( - self.log, - "Block returned for single block lookup not present"; - "response_type" => ?response_type, - ); - } + // We don't have the ability to cancel in-flight RPC requests. So this can happen + // if we started this RPC request, and later saw the block/blobs via gossip. + debug!( + self.log, + "Block returned for single block lookup not present"; + "response_type" => ?response_type, + ); return; }; let expected_block_root = lookup.block_root(); - if response.is_some() { - debug!(self.log, - "Peer returned response for single lookup"; - "peer_id" => %peer_id , - "id" => ?id, - "block_root" => ?expected_block_root, - "response_type" => ?response_type, - ); - } + debug!(self.log, + "Peer returned response for single lookup"; + "peer_id" => %peer_id , + "id" => ?id, + "block_root" => ?expected_block_root, + "response_type" => ?response_type, + ); match self.single_lookup_response_inner::(peer_id, response, seen_timestamp, cx, lookup) { @@ -368,9 +364,9 @@ impl BlockLookups { fn single_lookup_response_inner>( &self, peer_id: PeerId, - response: Option, + response: R::VerifiedResponseType, seen_timestamp: Duration, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, mut lookup: SingleBlockLookup, ) -> Result, LookupRequestError> { let response_type = R::response_type(); @@ -378,8 +374,8 @@ impl BlockLookups { let expected_block_root = lookup.block_root(); let request_state = R::request_state_mut(&mut lookup); - match request_state.verify_response(expected_block_root, peer_id, response) { - Ok(Some(verified_response)) => { + match request_state.verify_response(response) { + Ok(verified_response) => { self.handle_verified_response::( seen_timestamp, cx, @@ -388,7 +384,6 @@ impl BlockLookups { &mut lookup, )?; } - Ok(None) => {} Err(e) => { debug!( log, @@ -411,7 +406,7 @@ impl BlockLookups { fn handle_verified_response>( &self, seen_timestamp: Duration, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, process_type: BlockProcessType, verified_response: R::VerifiedResponseType, lookup: &mut SingleBlockLookup, @@ -504,26 +499,22 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: PeerId, - response: Option, + response: R::VerifiedResponseType, seen_timestamp: Duration, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { - if response.is_some() { - debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id); - } + debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id); return; }; - if response.is_some() { - debug!(self.log, - "Peer returned response for parent lookup"; - "peer_id" => %peer_id , - "id" => ?id, - "block_root" => ?parent_lookup.current_parent_request.block_request_state.requested_block_root, - "response_type" => ?R::response_type(), - ); - } + debug!(self.log, + "Peer returned response for parent lookup"; + "peer_id" => %peer_id , + "id" => ?id, + "block_root" => ?parent_lookup.current_parent_request.block_request_state.requested_block_root, + "response_type" => ?R::response_type(), + ); match self.parent_lookup_response_inner::( peer_id, @@ -551,13 +542,13 @@ impl BlockLookups { fn parent_lookup_response_inner>( &mut self, peer_id: PeerId, - response: Option, + response: R::VerifiedResponseType, seen_timestamp: Duration, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, parent_lookup: &mut ParentLookup, ) -> Result<(), RequestError> { - match parent_lookup.verify_response::(peer_id, response, &mut self.failed_chains) { - Ok(Some(verified_response)) => { + match parent_lookup.verify_response::(response, &mut self.failed_chains) { + Ok(verified_response) => { self.handle_verified_response::( seen_timestamp, cx, @@ -568,7 +559,6 @@ impl BlockLookups { &mut parent_lookup.current_parent_request, )?; } - Ok(None) => {} Err(e) => self.handle_parent_verify_error::(peer_id, parent_lookup, e, cx)?, }; Ok(()) @@ -580,7 +570,7 @@ impl BlockLookups { peer_id: PeerId, parent_lookup: &mut ParentLookup, e: ParentVerifyError, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), RequestError> { match e { ParentVerifyError::RootMismatch @@ -695,7 +685,7 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: &PeerId, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, error: RPCError, ) { let msg = error.as_static_str(); @@ -724,7 +714,7 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: &PeerId, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, error: RPCError, ) { let msg = error.as_static_str(); @@ -842,7 +832,7 @@ impl BlockLookups { /// blobs peer because they did not provide all blobs on the initial request. fn handle_missing_components>( &self, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, lookup: &mut SingleBlockLookup, ) -> Result<(), LookupRequestError> { let request_state = R::request_state_mut(lookup); @@ -1008,20 +998,21 @@ impl BlockLookups { } BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) | BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown(_)) => { + let (chain_hash, blocks, hashes, block_request) = + parent_lookup.parts_for_processing(); + + let blocks = self.add_child_block_to_chain(chain_hash, blocks, cx).into(); + + let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); + // Check if the beacon processor is available let Some(beacon_processor) = cx.beacon_processor_if_enabled() else { return trace!( self.log, "Dropping parent chain segment that was ready for processing."; - parent_lookup + "chain_hash" => %chain_hash, ); }; - let (chain_hash, blocks, hashes, block_request) = - parent_lookup.parts_for_processing(); - - let blocks = self.add_child_block_to_chain(chain_hash, blocks, cx).into(); - - let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); match beacon_processor.send_chain_segment(process_id, blocks) { Ok(_) => { @@ -1075,7 +1066,7 @@ impl BlockLookups { &mut self, chain_hash: Hash256, mut blocks: VecDeque>, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> VecDeque> { // Find the child block that spawned the parent lookup request and add it to the chain // to send for processing. @@ -1128,7 +1119,7 @@ impl BlockLookups { fn handle_parent_block_error( &mut self, outcome: BlockError<::EthSpec>, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, mut parent_lookup: ParentLookup, ) { // We should always have a block peer. @@ -1180,7 +1171,7 @@ impl BlockLookups { &mut self, chain_hash: Hash256, result: BatchProcessResult, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let Some((_hashes, request)) = self.processing_parent_lookups.remove(&chain_hash) else { return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash, "result" => ?result); @@ -1341,7 +1332,11 @@ impl BlockLookups { /// Attempts to request the next unknown parent. This method handles peer scoring and dropping /// the lookup in the event of failure. - fn request_parent(&mut self, mut parent_lookup: ParentLookup, cx: &SyncNetworkContext) { + fn request_parent( + &mut self, + mut parent_lookup: ParentLookup, + cx: &mut SyncNetworkContext, + ) { let response = parent_lookup.request_parent(cx); match response { diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 55dd26b661e..97c963718bd 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -92,7 +92,7 @@ impl ParentLookup { } /// Attempts to request the next unknown parent. If the request fails, it should be removed. - pub fn request_parent(&mut self, cx: &SyncNetworkContext) -> Result<(), RequestError> { + pub fn request_parent(&mut self, cx: &mut SyncNetworkContext) -> Result<(), RequestError> { // check to make sure this request hasn't failed if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE { return Err(RequestError::ChainTooLong); @@ -190,21 +190,15 @@ impl ParentLookup { /// the processing result of the block. pub fn verify_response>( &mut self, - peer_id: PeerId, - block: Option, + block: R::VerifiedResponseType, failed_chains: &mut lru_cache::LRUTimeCache, - ) -> Result, ParentVerifyError> { - let expected_block_root = self.current_parent_request.block_root(); + ) -> Result { let request_state = R::request_state_mut(&mut self.current_parent_request); - let root_and_verified = - request_state.verify_response(expected_block_root, peer_id, block)?; + let root_and_verified = request_state.verify_response(block)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should // be dropped and the peer downscored. - if let Some(parent_root) = root_and_verified - .as_ref() - .and_then(|block| R::get_parent_root(block)) - { + if let Some(parent_root) = R::get_parent_root(&root_and_verified) { if failed_chains.contains(&parent_root) { request_state.register_failure_downloading(); return Err(ParentVerifyError::PreviousFailure { parent_root }); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 15d10c77c24..3f545f74186 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -108,7 +108,7 @@ impl SingleBlockLookup { /// downloading the block and/or blobs. pub fn request_block_and_blobs( &mut self, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { let block_already_downloaded = self.block_already_downloaded(); let blobs_already_downloaded = self.blobs_already_downloaded(); @@ -216,7 +216,7 @@ impl SingleBlockLookup { pub fn should_drop_lookup_on_disconnected_peer( &mut self, peer_id: &PeerId, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, log: &Logger, ) -> bool { let block_root = self.block_root(); @@ -318,6 +318,7 @@ pub struct BlobRequestState { /// from both block/blobs downloaded in the network layer and any blocks/blobs that exist in /// the data availability checker. pub requested_ids: MissingBlobs, + pub block_root: Hash256, /// Where we store blobs until we receive the stream terminator. pub blob_download_queue: FixedBlobSidecarList, pub state: SingleLookupRequestState, @@ -328,6 +329,7 @@ impl BlobRequestState { pub fn new(block_root: Hash256, peer_source: &[PeerId], is_deneb: bool) -> Self { let default_ids = MissingBlobs::new_without_block(block_root, is_deneb); Self { + block_root, requested_ids: default_ids, blob_download_queue: <_>::default(), state: SingleLookupRequestState::new(peer_source), @@ -669,18 +671,14 @@ mod tests { ); as RequestState>::build_request( &mut sl.block_request_state, - &spec, ) .unwrap(); sl.block_request_state.state.state = State::Downloading { peer_id }; as RequestState>::verify_response( &mut sl.block_request_state, - block.canonical_root(), - peer_id, - Some(block.into()), + block.into(), ) - .unwrap() .unwrap(); } @@ -714,7 +712,6 @@ mod tests { for _ in 1..TestLookup2::MAX_ATTEMPTS { as RequestState>::build_request( &mut sl.block_request_state, - &spec, ) .unwrap(); sl.block_request_state.state.on_download_failure(); @@ -723,18 +720,14 @@ mod tests { // Now we receive the block and send it for processing as RequestState>::build_request( &mut sl.block_request_state, - &spec, ) .unwrap(); sl.block_request_state.state.state = State::Downloading { peer_id }; as RequestState>::verify_response( &mut sl.block_request_state, - block.canonical_root(), - peer_id, - Some(block.into()), + block.into(), ) - .unwrap() .unwrap(); // One processing failure maxes the available attempts @@ -742,7 +735,6 @@ mod tests { assert_eq!( as RequestState>::build_request( &mut sl.block_request_state, - &spec ), Err(LookupRequestError::TooManyAttempts { cannot_process: false diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index a868a092d3d..23bd1010bfe 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -36,7 +36,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::common::LookupType; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlob, RangeRequestId, SyncNetworkContext}; +use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; @@ -320,42 +320,12 @@ impl SyncManager { fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { trace!(self.log, "Sync manager received a failed RPC"); match request_id { - RequestId::SingleBlock { id } => match id.lookup_type { - LookupType::Current => self - .block_lookups - .single_block_lookup_failed::>( - id, - &peer_id, - &self.network, - error, - ), - LookupType::Parent => self - .block_lookups - .parent_lookup_failed::>( - id, - &peer_id, - &self.network, - error, - ), - }, - RequestId::SingleBlob { id } => match id.lookup_type { - LookupType::Current => self - .block_lookups - .single_block_lookup_failed::>( - id, - &peer_id, - &self.network, - error, - ), - LookupType::Parent => self - .block_lookups - .parent_lookup_failed::>( - id, - &peer_id, - &self.network, - error, - ), - }, + RequestId::SingleBlock { id } => { + self.on_single_block_response(id, peer_id, RpcEvent::RPCError(error)) + } + RequestId::SingleBlob { id } => { + self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) + } RequestId::RangeBlockAndBlobs { id } => { if let Some(sender_id) = self.network.range_request_failed(id) { match sender_id { @@ -694,7 +664,7 @@ impl SyncManager { } ChainSegmentProcessId::ParentLookup(chain_hash) => self .block_lookups - .parent_chain_processed(chain_hash, result, &self.network), + .parent_chain_processed(chain_hash, result, &mut self.network), }, } } @@ -836,26 +806,14 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => match id.lookup_type { - LookupType::Current => self - .block_lookups - .single_lookup_response::>( - id, - peer_id, - block, - seen_timestamp, - &self.network, - ), - LookupType::Parent => self - .block_lookups - .parent_lookup_response::>( - id, - peer_id, - block, - seen_timestamp, - &self.network, - ), - }, + RequestId::SingleBlock { id } => self.on_single_block_response( + id, + peer_id, + match block { + Some(block) => RpcEvent::Response(block, seen_timestamp), + None => RpcEvent::StreamTermination, + }, + ), RequestId::SingleBlob { .. } => { crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id ); } @@ -865,6 +823,56 @@ impl SyncManager { } } + fn on_single_block_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + block: RpcEvent>>, + ) { + if let Some(resp) = self.network.on_single_block_response(id, block) { + match resp { + Ok((block, seen_timestamp)) => match id.lookup_type { + LookupType::Current => self + .block_lookups + .single_lookup_response::>( + id, + peer_id, + block, + seen_timestamp, + &mut self.network, + ), + LookupType::Parent => self + .block_lookups + .parent_lookup_response::>( + id, + peer_id, + block, + seen_timestamp, + &mut self.network, + ), + }, + Err(error) => match id.lookup_type { + LookupType::Current => self + .block_lookups + .single_block_lookup_failed::>( + id, + &peer_id, + &mut self.network, + error, + ), + LookupType::Parent => self + .block_lookups + .parent_lookup_failed::>( + id, + &peer_id, + &mut self.network, + error, + ), + }, + } + } + } + fn rpc_blob_received( &mut self, request_id: RequestId, @@ -876,32 +884,71 @@ impl SyncManager { RequestId::SingleBlock { .. } => { crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); } - RequestId::SingleBlob { id } => match id.lookup_type { - LookupType::Current => self - .block_lookups - .single_lookup_response::>( - id, - peer_id, - blob, - seen_timestamp, - &self.network, - ), - LookupType::Parent => self - .block_lookups - .parent_lookup_response::>( - id, - peer_id, - blob, - seen_timestamp, - &self.network, - ), - }, + RequestId::SingleBlob { id } => self.on_single_blob_response( + id, + peer_id, + match blob { + Some(blob) => RpcEvent::Response(blob, seen_timestamp), + None => RpcEvent::StreamTermination, + }, + ), RequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, blob.into()) } } } + fn on_single_blob_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + blob: RpcEvent>>, + ) { + if let Some(resp) = self.network.on_single_blob_response(id, blob) { + match resp { + Ok((blobs, seen_timestamp)) => match id.lookup_type { + LookupType::Current => self + .block_lookups + .single_lookup_response::>( + id, + peer_id, + blobs, + seen_timestamp, + &mut self.network, + ), + LookupType::Parent => self + .block_lookups + .parent_lookup_response::>( + id, + peer_id, + blobs, + seen_timestamp, + &mut self.network, + ), + }, + + Err(error) => match id.lookup_type { + LookupType::Current => self + .block_lookups + .single_block_lookup_failed::>( + id, + &peer_id, + &mut self.network, + error, + ), + LookupType::Parent => self + .block_lookups + .parent_lookup_failed::>( + id, + &peer_id, + &mut self.network, + error, + ), + }, + } + } + } + /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. fn range_block_and_blobs_response( diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 96f8de46fb7..22907227115 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -9,16 +9,19 @@ use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use crate::sync::manager::SingleLookupReqId; use beacon_chain::block_verification_types::RpcBlock; -use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; +use beacon_chain::validator_monitor::timestamp_now; +use beacon_chain::{get_block_root, BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; -use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; +use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RPCError}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; use std::collections::hash_map::Entry; use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; +use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock}; pub struct BlocksAndBlobsByRangeResponse { pub sender_id: RangeRequestId, @@ -37,6 +40,13 @@ pub enum RangeRequestId { }, } +#[derive(Debug)] +pub enum RpcEvent { + StreamTermination, + Response(T, Duration), + RPCError(RPCError), +} + /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { /// The network channel to relay messages to the Network service. @@ -45,6 +55,9 @@ pub struct SyncNetworkContext { /// A sequential ID for all RPC requests. request_id: Id, + blocks_by_root_requests: FnvHashMap, + blobs_by_root_requests: FnvHashMap>, + /// BlocksByRange requests paired with BlobsByRange range_blocks_and_blobs_requests: FnvHashMap)>, @@ -80,6 +93,91 @@ impl From>>> for BlockOrBlob { } } +struct ActiveBlocksByRootRequest { + request: Hash256, + resolved: bool, +} + +impl ActiveBlocksByRootRequest { + fn add_response( + &mut self, + block: Arc>, + ) -> Result>, RPCError> { + if self.resolved { + return Err(RPCError::InvalidData("too many responses".to_string())); + } + + if self.request != get_block_root(&block) { + return Err(RPCError::InvalidData("wrong block root".to_string())); + } + + // Valid data, blocks by root expects a single response + self.resolved = true; + Ok(block) + } + + fn terminate(self) -> Result<(), RPCError> { + if self.resolved { + Ok(()) + } else { + Err(RPCError::InvalidData("no response returned".to_string())) + } + } +} + +pub type BlocksByRootSingleRequest = Hash256; + +pub struct BlobsByRootSingleBlockRequest { + pub block_root: Hash256, + pub indices: Vec, +} + +struct ActiveBlobsByRootRequest { + request: BlobsByRootSingleBlockRequest, + blobs: Vec>>, + resolved: bool, +} + +impl ActiveBlobsByRootRequest { + fn add_response( + &mut self, + blob: Arc>, + ) -> Result>>>, RPCError> { + if self.resolved { + return Err(RPCError::InvalidData("too many responses".to_string())); + } + + if self.request.block_root != blob.block_root() { + return Err(RPCError::InvalidData("un-requested block root".to_string())); + } + if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { + return Err(RPCError::InvalidData("invalid inclusion proof".to_string())); + } + if !self.request.indices.contains(&blob.index) { + return Err(RPCError::InvalidData("un-requested blob index".to_string())); + } + if self.blobs.iter().any(|b| b.index == blob.index) { + return Err(RPCError::InvalidData("duplicated data".to_string())); + } + + self.blobs.push(blob); + if self.blobs.len() >= self.request.indices.len() { + // All expected chunks received, return result early + Ok(Some(std::mem::take(&mut self.blobs))) + } else { + Ok(None) + } + } + + fn terminate(self) -> Option>>> { + if self.resolved { + return None; + } else { + Some(self.blobs) + } + } +} + impl SyncNetworkContext { pub fn new( network_send: mpsc::UnboundedSender>, @@ -91,6 +189,8 @@ impl SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start request_id: 1, + blocks_by_root_requests: <_>::default(), + blobs_by_root_requests: <_>::default(), range_blocks_and_blobs_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -245,62 +345,81 @@ impl SyncNetworkContext { } pub fn block_lookup_request( - &self, + &mut self, id: SingleLookupReqId, peer_id: PeerId, - request: BlocksByRootRequest, + block_root: BlocksByRootSingleRequest, ) -> Result<(), &'static str> { debug!( self.log, "Sending BlocksByRoot Request"; "method" => "BlocksByRoot", - "block_roots" => ?request.block_roots().to_vec(), + "block_root" => ?block_root, "peer" => %peer_id, "id" => ?id ); self.send_network_msg(NetworkMessage::SendRequest { peer_id, - request: Request::BlocksByRoot(request), + request: Request::BlocksByRoot(BlocksByRootRequest::new( + vec![block_root], + &self.chain.spec, + )), request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), })?; + + self.blocks_by_root_requests.insert( + id, + ActiveBlocksByRootRequest { + request: block_root, + resolved: false, + }, + ); + Ok(()) } pub fn blob_lookup_request( - &self, + &mut self, id: SingleLookupReqId, - blob_peer_id: PeerId, - blob_request: BlobsByRootRequest, + peer_id: PeerId, + request: BlobsByRootSingleBlockRequest, ) -> Result<(), &'static str> { - if let Some(block_root) = blob_request - .blob_ids - .as_slice() - .first() - .map(|id| id.block_root) - { - let indices = blob_request - .blob_ids - .as_slice() - .iter() - .map(|id| id.index) - .collect::>(); - debug!( - self.log, - "Sending BlobsByRoot Request"; - "method" => "BlobsByRoot", - "block_root" => ?block_root, - "blob_indices" => ?indices, - "peer" => %blob_peer_id, - "id" => ?id - ); + debug!( + self.log, + "Sending BlobsByRoot Request"; + "method" => "BlobsByRoot", + "block_root" => ?request.block_root, + "blob_indices" => ?request.indices, + "peer" => %peer_id, + "id" => ?id + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id: peer_id, + request: Request::BlobsByRoot(BlobsByRootRequest::new( + request + .indices + .iter() + .map(|index| BlobIdentifier { + block_root: request.block_root, + index: *index, + }) + .collect(), + &self.chain.spec, + )), + request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), + })?; + + self.blobs_by_root_requests.insert( + id, + ActiveBlobsByRootRequest { + request, + resolved: false, + blobs: vec![], + }, + ); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id: blob_peer_id, - request: Request::BlobsByRoot(blob_request), - request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), - })?; - } Ok(()) } @@ -405,4 +524,84 @@ impl SyncNetworkContext { self.range_blocks_and_blobs_requests .insert(id, (sender_id, info)); } + + // Request handlers + + pub fn on_single_block_response( + &mut self, + request_id: SingleLookupReqId, + block: RpcEvent>>, + ) -> Option>, Duration), RPCError>> { + let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { + return None; + }; + + Some(match block { + RpcEvent::Response(block, seen_timestamp) => { + match request.get_mut().add_response(block) { + Ok(block) => Ok((block, seen_timestamp)), + Err(e) => Err(e), + } + } + RpcEvent::StreamTermination => match request.remove().terminate() { + Ok(_) => return None, + Err(e) => Err(e), + }, + RpcEvent::RPCError(e) => { + request.remove(); + Err(e) + } + }) + } + + pub fn on_single_blob_response( + &mut self, + request_id: SingleLookupReqId, + blob: RpcEvent>>, + ) -> Option, Duration), RPCError>> { + let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { + return None; + }; + + Some(match blob { + RpcEvent::Response(blob, _) => match request.get_mut().add_response(blob) { + // TODO: Should deal only with Vec> + Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) + .map(|blobs| (blobs, timestamp_now())) + .map_err(RPCError::InvalidData), + Ok(None) => return None, + Err(e) => Err(e), + }, + RpcEvent::StreamTermination => { + // Stream terminator + match request.remove().terminate() { + // TODO: Should deal only with Vec> + Some(blobs) => to_fixed_blob_sidecar_list(blobs) + // TODO: a seen_timestamp for an array of blobs doesn't make much sense + // since each is received at different times. Should we track first, last or + // average? + .map(|blobs| (blobs, timestamp_now())) + .map_err(RPCError::InvalidData), + None => return None, + } + } + RpcEvent::RPCError(e) => { + request.remove(); + Err(e) + } + }) + } +} + +fn to_fixed_blob_sidecar_list( + blobs: Vec>>, +) -> Result, String> { + let mut fixed_list = FixedBlobSidecarList::default(); + for blob in blobs.into_iter() { + let index = blob.index as usize; + *fixed_list + .get_mut(index) + .ok_or("invalid index".to_string())? = Some(blob) + } + Ok(fixed_list) } From 35605b43c8767831a777f6055785b159f854ed48 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sat, 13 Apr 2024 14:25:26 +0900 Subject: [PATCH 2/7] Fix tests --- .../network/src/sync/block_lookups/common.rs | 24 +-- .../network/src/sync/block_lookups/mod.rs | 202 ++++++++---------- .../src/sync/block_lookups/parent_lookup.rs | 62 +----- .../sync/block_lookups/single_block_lookup.rs | 50 ++--- .../network/src/sync/block_lookups/tests.rs | 11 +- .../network/src/sync/network_context.rs | 152 +++---------- .../src/sync/network_context/requests.rs | 144 +++++++++++++ 7 files changed, 298 insertions(+), 347 deletions(-) create mode 100644 beacon_node/network/src/sync/network_context/requests.rs diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index f0b50fb6ac1..038a216dcb9 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -1,6 +1,6 @@ use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE; use crate::sync::block_lookups::single_block_lookup::{ - LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State, + LookupRequestError, SingleBlockLookup, SingleLookupRequestState, State, }; use crate::sync::block_lookups::{ BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, @@ -150,26 +150,6 @@ pub trait RequestState { /* Response handling methods */ - /// Verify the response is valid based on what we requested. - fn verify_response( - &mut self, - response: Self::VerifiedResponseType, - ) -> Result { - let request_state = self.get_state_mut(); - match request_state.state { - State::AwaitingDownload => { - request_state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - State::Downloading { peer_id } => Ok(response), - State::Processing { peer_id: _ } => { - // We sent the block for processing and received an extra block. - request_state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - } - } - /// A getter for the parent root of the response. Returns an `Option` because we won't know /// the blob parent if we don't end up getting any blobs in the response. fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option; @@ -223,7 +203,7 @@ impl RequestState for BlockRequestState type ReconstructedResponseType = RpcBlock; fn new_request(&self) -> Self::RequestType { - self.requested_block_root + BlocksByRootSingleRequest(self.requested_block_root) } fn make_request( diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 1bc5d1922a9..d218eb69918 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,4 +1,3 @@ -use self::parent_lookup::ParentVerifyError; use self::single_block_lookup::SingleBlockLookup; use super::manager::BlockProcessingResult; use super::BatchProcessResult; @@ -319,7 +318,7 @@ impl BlockLookups { let id = lookup_id.id; let response_type = R::response_type(); - let Some(lookup) = self.get_single_lookup::(lookup_id) else { + let Some(mut lookup) = self.get_single_lookup::(lookup_id) else { // We don't have the ability to cancel in-flight RPC requests. So this can happen // if we started this RPC request, and later saw the block/blobs via gossip. debug!( @@ -339,9 +338,14 @@ impl BlockLookups { "response_type" => ?response_type, ); - match self.single_lookup_response_inner::(peer_id, response, seen_timestamp, cx, lookup) - { - Ok(lookup) => { + match self.handle_verified_response::( + seen_timestamp, + cx, + BlockProcessType::SingleBlock { id: lookup.id }, + response, + &mut lookup, + ) { + Ok(_) => { self.single_block_lookups.insert(id, lookup); } Err(e) => { @@ -361,48 +365,6 @@ impl BlockLookups { /// Consolidates error handling for `single_lookup_response`. An `Err` here should always mean /// the lookup is dropped. - fn single_lookup_response_inner>( - &self, - peer_id: PeerId, - response: R::VerifiedResponseType, - seen_timestamp: Duration, - cx: &mut SyncNetworkContext, - mut lookup: SingleBlockLookup, - ) -> Result, LookupRequestError> { - let response_type = R::response_type(); - let log = self.log.clone(); - let expected_block_root = lookup.block_root(); - let request_state = R::request_state_mut(&mut lookup); - - match request_state.verify_response(response) { - Ok(verified_response) => { - self.handle_verified_response::( - seen_timestamp, - cx, - BlockProcessType::SingleBlock { id: lookup.id }, - verified_response, - &mut lookup, - )?; - } - Err(e) => { - debug!( - log, - "Single lookup response verification failed, retrying"; - "block_root" => ?expected_block_root, - "peer_id" => %peer_id, - "response_type" => ?response_type, - "error" => ?e - ); - let msg = e.into(); - cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); - - request_state.register_failure_downloading(); - lookup.request_block_and_blobs(cx)?; - } - } - Ok(lookup) - } - fn handle_verified_response>( &self, seen_timestamp: Duration, @@ -446,14 +408,21 @@ impl BlockLookups { } lookup.request_block_and_blobs(cx)?; } - CachedChild::NotRequired => R::send_reconstructed_for_processing( - id, - self, - block_root, - R::verified_to_reconstructed(block_root, verified_response), - seen_timestamp, - cx, - )?, + CachedChild::NotRequired => { + R::request_state_mut(lookup) + .get_state_mut() + .into_processing() + .map_err(LookupRequestError::BadState)?; + + R::send_reconstructed_for_processing( + id, + self, + block_root, + R::verified_to_reconstructed(block_root, verified_response), + seen_timestamp, + cx, + )? + } CachedChild::Err(e) => { warn!(self.log, "Consistency error in cached block"; "error" => ?e, @@ -547,53 +516,12 @@ impl BlockLookups { cx: &mut SyncNetworkContext, parent_lookup: &mut ParentLookup, ) -> Result<(), RequestError> { - match parent_lookup.verify_response::(response, &mut self.failed_chains) { - Ok(verified_response) => { - self.handle_verified_response::( - seen_timestamp, - cx, - BlockProcessType::ParentLookup { - chain_hash: parent_lookup.chain_hash(), - }, - verified_response, - &mut parent_lookup.current_parent_request, - )?; - } - Err(e) => self.handle_parent_verify_error::(peer_id, parent_lookup, e, cx)?, - }; - Ok(()) - } - - /// Handle logging and peer scoring for `ParentVerifyError`s during parent lookup requests. - fn handle_parent_verify_error>( - &mut self, - peer_id: PeerId, - parent_lookup: &mut ParentLookup, - e: ParentVerifyError, - cx: &mut SyncNetworkContext, - ) -> Result<(), RequestError> { - match e { - ParentVerifyError::RootMismatch - | ParentVerifyError::NoBlockReturned - | ParentVerifyError::NotEnoughBlobsReturned - | ParentVerifyError::ExtraBlocksReturned - | ParentVerifyError::UnrequestedBlobId(_) - | ParentVerifyError::InvalidInclusionProof - | ParentVerifyError::UnrequestedHeader - | ParentVerifyError::ExtraBlobsReturned - | ParentVerifyError::InvalidIndex(_) => { - let e = e.into(); - warn!(self.log, "Peer sent invalid response to parent request"; - "peer_id" => %peer_id, "reason" => %e); - - // We do not tolerate these kinds of errors. We will accept a few but these are signs - // of a faulty peer. - cx.report_peer(peer_id, PeerAction::LowToleranceError, e); - - // We try again if possible. - parent_lookup.request_parent(cx)?; - } - ParentVerifyError::PreviousFailure { parent_root } => { + // check if the parent of this block isn't in the failed cache. If it is, this chain should + // be dropped and the peer downscored. + if let Some(parent_root) = R::get_parent_root(&response) { + if self.failed_chains.contains(&parent_root) { + let request_state = R::request_state_mut(&mut parent_lookup.current_parent_request); + request_state.register_failure_downloading(); debug!( self.log, "Parent chain ignored due to past failure"; @@ -607,8 +535,20 @@ impl BlockLookups { PeerAction::MidToleranceError, "bbroot_failed_chains", ); + return Ok(()); } } + + self.handle_verified_response::( + seen_timestamp, + cx, + BlockProcessType::ParentLookup { + chain_hash: parent_lookup.chain_hash(), + }, + response, + &mut parent_lookup.current_parent_request, + )?; + Ok(()) } @@ -648,8 +588,8 @@ impl BlockLookups { // This happens if the peer disconnects while the block is being // processed. Drop the request without extra penalty } - RequestError::BadState(_) => { - // Should never happen + RequestError::BadState(..) => { + // Internal error should never happen } } } @@ -688,18 +628,22 @@ impl BlockLookups { cx: &mut SyncNetworkContext, error: RPCError, ) { - let msg = error.as_static_str(); + // Downscore peer even if lookup is not known + self.downscore_on_rpc_error(peer_id, &error, cx); + let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { debug!(self.log, "RPC failure for a block parent lookup request that was not found"; "peer_id" => %peer_id, - "error" => msg + "error" => %error ); return; }; R::request_state_mut(&mut parent_lookup.current_parent_request) .register_failure_downloading(); - trace!(self.log, "Parent lookup block request failed"; &parent_lookup, "error" => msg); + debug!(self.log, "Parent lookup block request failed"; + "chain_hash" => %parent_lookup.chain_hash(), "id" => ?id, "error" => %error + ); self.request_parent(parent_lookup, cx); @@ -717,7 +661,9 @@ impl BlockLookups { cx: &mut SyncNetworkContext, error: RPCError, ) { - let msg = error.as_static_str(); + // Downscore peer even if lookup is not known + self.downscore_on_rpc_error(peer_id, &error, cx); + let log = self.log.clone(); let Some(mut lookup) = self.get_single_lookup::(id) else { debug!(log, "Error response to dropped lookup"; "error" => ?error); @@ -729,7 +675,7 @@ impl BlockLookups { trace!(log, "Single lookup failed"; "block_root" => ?block_root, - "error" => msg, + "error" => %error, "peer_id" => %peer_id, "response_type" => ?response_type ); @@ -1123,8 +1069,12 @@ impl BlockLookups { mut parent_lookup: ParentLookup, ) { // We should always have a block peer. - let Ok(block_peer_id) = parent_lookup.block_processing_peer() else { - return; + let block_peer_id = match parent_lookup.block_processing_peer() { + Ok(peer_id) => peer_id, + Err(e) => { + warn!(self.log, "Parent lookup in bad state"; "chain_hash" => %parent_lookup.chain_hash(), "error" => e); + return; + } }; // We may not have a blob peer, if there were no blobs required for this block. @@ -1364,4 +1314,34 @@ impl BlockLookups { pub fn drop_parent_chain_requests(&mut self) -> usize { self.parent_lookups.drain(..).len() } + + pub fn downscore_on_rpc_error( + &self, + peer_id: &PeerId, + error: &RPCError, + cx: &SyncNetworkContext, + ) { + // Note: logging the report event here with the full error display. The log inside + // `report_peer` only includes a smaller string, like "invalid_data" + debug!(self.log, "reporting peer for sync lookup error"; "error" => %error); + if let Some(action) = match error { + // Protocol errors are heavily penalized + RPCError::SSZDecodeError(..) + | RPCError::IoError(..) + | RPCError::ErrorResponse(..) + | RPCError::InvalidData(..) + | RPCError::HandlerRejected => Some(PeerAction::LowToleranceError), + // Timing / network errors are less penalized + // TODO: Is IoError a protocol error or network error? + RPCError::StreamTimeout | RPCError::IncompleteStream | RPCError::NegotiationTimeout => { + Some(PeerAction::MidToleranceError) + } + // Not supporting a specific protocol is tolerated. TODO: Are you sure? + RPCError::UnsupportedProtocol => None, + // Our fault, don't penalize peer + RPCError::InternalError(..) | RPCError::Disconnected => None, + } { + cx.report_peer(*peer_id, action, error.into()); + } + } } diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 97c963718bd..b7a71860bff 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,7 +1,6 @@ -use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; +use super::single_block_lookup::{LookupRequestError, SingleBlockLookup}; use super::{DownloadedBlock, PeerId}; use crate::sync::block_lookups::common::Parent; -use crate::sync::block_lookups::common::RequestState; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; @@ -10,8 +9,6 @@ use beacon_chain::BeaconChainTypes; use std::collections::VecDeque; use std::sync::Arc; use store::Hash256; -use strum::IntoStaticStr; -use types::blob_sidecar::BlobIdentifier; /// How many attempts we try to find a parent of a block before we give up trying. pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; @@ -30,22 +27,8 @@ pub(crate) struct ParentLookup { pub current_parent_request: SingleBlockLookup, } -#[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum ParentVerifyError { - RootMismatch, - NoBlockReturned, - NotEnoughBlobsReturned, - ExtraBlocksReturned, - UnrequestedBlobId(BlobIdentifier), - InvalidInclusionProof, - UnrequestedHeader, - ExtraBlobsReturned, - InvalidIndex(u64), - PreviousFailure { parent_root: Hash256 }, -} - #[derive(Debug, PartialEq, Eq)] -pub enum RequestError { +pub(crate) enum RequestError { SendFailed(&'static str), ChainTooLong, /// We witnessed too many failures trying to complete this parent lookup. @@ -186,28 +169,6 @@ impl ParentLookup { } } - /// Verifies that the received block is what we requested. If so, parent lookup now waits for - /// the processing result of the block. - pub fn verify_response>( - &mut self, - block: R::VerifiedResponseType, - failed_chains: &mut lru_cache::LRUTimeCache, - ) -> Result { - let request_state = R::request_state_mut(&mut self.current_parent_request); - let root_and_verified = request_state.verify_response(block)?; - - // check if the parent of this block isn't in the failed cache. If it is, this chain should - // be dropped and the peer downscored. - if let Some(parent_root) = R::get_parent_root(&root_and_verified) { - if failed_chains.contains(&parent_root) { - request_state.register_failure_downloading(); - return Err(ParentVerifyError::PreviousFailure { parent_root }); - } - } - - Ok(root_and_verified) - } - pub fn add_peer(&mut self, peer: PeerId) { self.current_parent_request.add_peer(peer) } @@ -222,23 +183,6 @@ impl ParentLookup { } } -impl From for ParentVerifyError { - fn from(e: LookupVerifyError) -> Self { - use LookupVerifyError as E; - match e { - E::RootMismatch => ParentVerifyError::RootMismatch, - E::NoBlockReturned => ParentVerifyError::NoBlockReturned, - E::ExtraBlocksReturned => ParentVerifyError::ExtraBlocksReturned, - E::UnrequestedBlobId(blob_id) => ParentVerifyError::UnrequestedBlobId(blob_id), - E::InvalidInclusionProof => ParentVerifyError::InvalidInclusionProof, - E::UnrequestedHeader => ParentVerifyError::UnrequestedHeader, - E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned, - E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index), - E::NotEnoughBlobsReturned => ParentVerifyError::NotEnoughBlobsReturned, - } - } -} - impl From for RequestError { fn from(e: LookupRequestError) -> Self { use LookupRequestError as E; @@ -276,7 +220,7 @@ impl RequestError { } RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts", RequestError::NoPeers => "no_peers", - RequestError::BadState(_) => "bad_state", + RequestError::BadState(..) => "bad_state", } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 3f545f74186..8e879620783 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -18,22 +18,9 @@ use std::marker::PhantomData; use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; -use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; +use types::blob_sidecar::FixedBlobSidecarList; use types::EthSpec; -#[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum LookupVerifyError { - RootMismatch, - NoBlockReturned, - ExtraBlocksReturned, - UnrequestedBlobId(BlobIdentifier), - InvalidInclusionProof, - UnrequestedHeader, - ExtraBlobsReturned, - NotEnoughBlobsReturned, - InvalidIndex(u64), -} - #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupRequestError { /// Too many failed attempts @@ -42,6 +29,7 @@ pub enum LookupRequestError { cannot_process: bool, }, NoPeers, + BadState(String), SendFailed(&'static str), BadState(String), } @@ -532,6 +520,21 @@ impl SingleLookupRequestState { self.used_peers.insert(peer_id); Some(peer_id) } + + pub fn into_processing(&mut self) -> Result<(), String> { + match self.state { + State::AwaitingDownload => { + return Err("request bad state, expected downloading got processing".to_owned()) + } + State::Downloading { peer_id } => { + self.state = State::Processing { peer_id }; + Ok(()) + } + State::Processing { .. } => { + return Err("request bad state, expected downloading got processing".to_owned()) + } + } + } } impl slog::Value for SingleBlockLookup { @@ -674,12 +677,6 @@ mod tests { ) .unwrap(); sl.block_request_state.state.state = State::Downloading { peer_id }; - - as RequestState>::verify_response( - &mut sl.block_request_state, - block.into(), - ) - .unwrap(); } #[test] @@ -724,21 +721,16 @@ mod tests { .unwrap(); sl.block_request_state.state.state = State::Downloading { peer_id }; - as RequestState>::verify_response( - &mut sl.block_request_state, - block.into(), - ) - .unwrap(); - // One processing failure maxes the available attempts sl.block_request_state.state.on_processing_failure(); assert_eq!( as RequestState>::build_request( &mut sl.block_request_state, - ), - Err(LookupRequestError::TooManyAttempts { + ) + .unwrap_err(), + LookupRequestError::TooManyAttempts { cannot_process: false - }) + } ) } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 6d50fe63200..fc6ac28bdc7 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -18,6 +18,7 @@ use beacon_processor::WorkEvent; use lighthouse_network::rpc::RPCResponseErrorCode; use lighthouse_network::types::SyncState; use lighthouse_network::{NetworkGlobals, Request}; +use slog::info; use slot_clock::{ManualSlotClock, SlotClock, TestingSlotClock}; use store::MemoryStore; use tokio::sync::mpsc; @@ -67,6 +68,7 @@ struct TestRig { /// `rng` for generating test blocks and blobs. rng: XorShiftRng, fork_name: ForkName, + log: Logger, } const D: Duration = Duration::new(0, 0); @@ -124,6 +126,7 @@ impl TestRig { log.clone(), ), fork_name, + log, } } @@ -136,6 +139,10 @@ impl TestRig { } } + fn log(&self, msg: &str) { + info!(self.log, "TEST_RIG"; "msg" => msg); + } + fn after_deneb(&self) -> bool { matches!(self.fork_name, ForkName::Deneb | ForkName::Electra) } @@ -914,7 +921,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { // Trigger the request rig.trigger_unknown_parent_block(peer_id, block.into()); - // Fail downloading the block + rig.log("Fail downloading the block"); for i in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) { let id = rig.expect_block_parent_request(parent_root); // Blobs are only requested in the first iteration as this test only retries blocks @@ -925,7 +932,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { rig.parent_lookup_failed_unavailable(id, peer_id); } - // Now fail processing a block in the parent request + rig.log("Now fail processing a block in the parent request"); for i in 0..PROCESSING_FAILURES { let id = rig.expect_block_parent_request(parent_root); // Blobs are only requested in the first iteration as this test only retries blocks diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 22907227115..086dd35d718 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,6 +1,8 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. +use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; +pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest}; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; @@ -10,18 +12,20 @@ use crate::status::ToStatusMessage; use crate::sync::manager::SingleLookupReqId; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::validator_monitor::timestamp_now; -use beacon_chain::{get_block_root, BeaconChain, BeaconChainTypes, EngineState}; +use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; -use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RPCError}; +use lighthouse_network::rpc::methods::BlobsByRangeRequest; +use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; -use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock}; +use types::blob_sidecar::FixedBlobSidecarList; +use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; + +mod requests; pub struct BlocksAndBlobsByRangeResponse { pub sender_id: RangeRequestId, @@ -93,91 +97,6 @@ impl From>>> for BlockOrBlob { } } -struct ActiveBlocksByRootRequest { - request: Hash256, - resolved: bool, -} - -impl ActiveBlocksByRootRequest { - fn add_response( - &mut self, - block: Arc>, - ) -> Result>, RPCError> { - if self.resolved { - return Err(RPCError::InvalidData("too many responses".to_string())); - } - - if self.request != get_block_root(&block) { - return Err(RPCError::InvalidData("wrong block root".to_string())); - } - - // Valid data, blocks by root expects a single response - self.resolved = true; - Ok(block) - } - - fn terminate(self) -> Result<(), RPCError> { - if self.resolved { - Ok(()) - } else { - Err(RPCError::InvalidData("no response returned".to_string())) - } - } -} - -pub type BlocksByRootSingleRequest = Hash256; - -pub struct BlobsByRootSingleBlockRequest { - pub block_root: Hash256, - pub indices: Vec, -} - -struct ActiveBlobsByRootRequest { - request: BlobsByRootSingleBlockRequest, - blobs: Vec>>, - resolved: bool, -} - -impl ActiveBlobsByRootRequest { - fn add_response( - &mut self, - blob: Arc>, - ) -> Result>>>, RPCError> { - if self.resolved { - return Err(RPCError::InvalidData("too many responses".to_string())); - } - - if self.request.block_root != blob.block_root() { - return Err(RPCError::InvalidData("un-requested block root".to_string())); - } - if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - return Err(RPCError::InvalidData("invalid inclusion proof".to_string())); - } - if !self.request.indices.contains(&blob.index) { - return Err(RPCError::InvalidData("un-requested blob index".to_string())); - } - if self.blobs.iter().any(|b| b.index == blob.index) { - return Err(RPCError::InvalidData("duplicated data".to_string())); - } - - self.blobs.push(blob); - if self.blobs.len() >= self.request.indices.len() { - // All expected chunks received, return result early - Ok(Some(std::mem::take(&mut self.blobs))) - } else { - Ok(None) - } - } - - fn terminate(self) -> Option>>> { - if self.resolved { - return None; - } else { - Some(self.blobs) - } - } -} - impl SyncNetworkContext { pub fn new( network_send: mpsc::UnboundedSender>, @@ -348,33 +267,25 @@ impl SyncNetworkContext { &mut self, id: SingleLookupReqId, peer_id: PeerId, - block_root: BlocksByRootSingleRequest, + request: BlocksByRootSingleRequest, ) -> Result<(), &'static str> { debug!( self.log, "Sending BlocksByRoot Request"; "method" => "BlocksByRoot", - "block_root" => ?block_root, + "block_root" => ?request.0, "peer" => %peer_id, "id" => ?id ); self.send_network_msg(NetworkMessage::SendRequest { peer_id, - request: Request::BlocksByRoot(BlocksByRootRequest::new( - vec![block_root], - &self.chain.spec, - )), + request: Request::BlocksByRoot(request.into_request(&self.chain.spec)), request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), })?; - self.blocks_by_root_requests.insert( - id, - ActiveBlocksByRootRequest { - request: block_root, - resolved: false, - }, - ); + self.blocks_by_root_requests + .insert(id, ActiveBlocksByRootRequest::new(request)); Ok(()) } @@ -397,28 +308,12 @@ impl SyncNetworkContext { self.send_network_msg(NetworkMessage::SendRequest { peer_id: peer_id, - request: Request::BlobsByRoot(BlobsByRootRequest::new( - request - .indices - .iter() - .map(|index| BlobIdentifier { - block_root: request.block_root, - index: *index, - }) - .collect(), - &self.chain.spec, - )), + request: Request::BlobsByRoot(request.into_request(&self.chain.spec)), request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), })?; - self.blobs_by_root_requests.insert( - id, - ActiveBlobsByRootRequest { - request, - resolved: false, - blobs: vec![], - }, - ); + self.blobs_by_root_requests + .insert(id, ActiveBlobsByRootRequest::new(request)); Ok(()) } @@ -540,7 +435,13 @@ impl SyncNetworkContext { RpcEvent::Response(block, seen_timestamp) => { match request.get_mut().add_response(block) { Ok(block) => Ok((block, seen_timestamp)), - Err(e) => Err(e), + Err(e) => { + // The request must be dropped after receiving an error. + // TODO: We could NOT drop the request here, and penalize the peer again if + // sends multiple penalizable chunks after the first invalid. + request.remove(); + Err(e) + } } } RpcEvent::StreamTermination => match request.remove().terminate() { @@ -570,7 +471,10 @@ impl SyncNetworkContext { .map(|blobs| (blobs, timestamp_now())) .map_err(RPCError::InvalidData), Ok(None) => return None, - Err(e) => Err(e), + Err(e) => { + request.remove(); + Err(e) + } }, RpcEvent::StreamTermination => { // Stream terminator diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs new file mode 100644 index 00000000000..1cc0260e450 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -0,0 +1,144 @@ +use beacon_chain::get_block_root; +use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest, RPCError}; +use std::sync::Arc; +use types::{ + blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, +}; + +pub struct ActiveBlocksByRootRequest { + request: BlocksByRootSingleRequest, + resolved: bool, +} + +impl ActiveBlocksByRootRequest { + pub fn new(request: BlocksByRootSingleRequest) -> Self { + Self { + request, + resolved: false, + } + } + + /// Append a response to the single chunk request. If the chunk is valid, the request is + /// resolved immediately. + /// The active request SHOULD be dropped after `add_response` returns an error + pub fn add_response( + &mut self, + block: Arc>, + ) -> Result>, RPCError> { + if self.resolved { + return Err(RPCError::InvalidData("too many responses".to_string())); + } + + let block_root = get_block_root(&block); + if self.request.0 != block_root { + return Err(RPCError::InvalidData(format!( + "un-requested block root {block_root:?}" + ))); + } + + // Valid data, blocks by root expects a single response + self.resolved = true; + Ok(block) + } + + pub fn terminate(self) -> Result<(), RPCError> { + if self.resolved { + Ok(()) + } else { + Err(RPCError::InvalidData("no response returned".to_string())) + } + } +} + +#[derive(Debug)] +pub struct BlocksByRootSingleRequest(pub Hash256); + +impl BlocksByRootSingleRequest { + pub fn into_request(&self, spec: &ChainSpec) -> BlocksByRootRequest { + BlocksByRootRequest::new(vec![self.0], spec) + } +} + +pub struct BlobsByRootSingleBlockRequest { + pub block_root: Hash256, + pub indices: Vec, +} + +impl BlobsByRootSingleBlockRequest { + pub fn into_request(&self, spec: &ChainSpec) -> BlobsByRootRequest { + BlobsByRootRequest::new( + self.indices + .iter() + .map(|index| BlobIdentifier { + block_root: self.block_root, + index: *index, + }) + .collect(), + spec, + ) + } +} + +pub struct ActiveBlobsByRootRequest { + request: BlobsByRootSingleBlockRequest, + blobs: Vec>>, + resolved: bool, +} + +impl ActiveBlobsByRootRequest { + pub fn new(request: BlobsByRootSingleBlockRequest) -> Self { + Self { + request, + blobs: vec![], + resolved: false, + } + } + + /// Appends a chunk to this multi-item request. If all expected chunks are received, this + /// method returns `Some`, resolving the request before the stream terminator. + /// The active request SHOULD be dropped after `add_response` returns an error + pub fn add_response( + &mut self, + blob: Arc>, + ) -> Result>>>, RPCError> { + if self.resolved { + return Err(RPCError::InvalidData("too many responses".to_string())); + } + + let block_root = blob.block_root(); + if self.request.block_root != block_root { + return Err(RPCError::InvalidData(format!( + "un-requested block root {block_root:?}" + ))); + } + if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { + return Err(RPCError::InvalidData("invalid inclusion proof".to_string())); + } + if !self.request.indices.contains(&blob.index) { + return Err(RPCError::InvalidData(format!( + "un-requested blob index {}", + blob.index + ))); + } + if self.blobs.iter().any(|b| b.index == blob.index) { + return Err(RPCError::InvalidData("duplicated data".to_string())); + } + + self.blobs.push(blob); + if self.blobs.len() >= self.request.indices.len() { + // All expected chunks received, return result early + self.resolved = true; + Ok(Some(std::mem::take(&mut self.blobs))) + } else { + Ok(None) + } + } + + pub fn terminate(self) -> Option>>> { + if self.resolved { + return None; + } else { + Some(self.blobs) + } + } +} From 9e7917a9c993f80bc0893ca693dd90424736b7ed Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 15 Apr 2024 23:43:28 +0900 Subject: [PATCH 3/7] Resolve merge conflicts --- .../network/src/sync/block_lookups/common.rs | 3 +- .../network/src/sync/block_lookups/mod.rs | 12 ++++--- .../sync/block_lookups/single_block_lookup.rs | 35 +++++++------------ 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 038a216dcb9..3bd39301b21 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -1,6 +1,6 @@ use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE; use crate::sync::block_lookups::single_block_lookup::{ - LookupRequestError, SingleBlockLookup, SingleLookupRequestState, State, + LookupRequestError, SingleBlockLookup, SingleLookupRequestState, }; use crate::sync::block_lookups::{ BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, @@ -12,7 +12,6 @@ use crate::sync::network_context::{ use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::ChildComponents; use beacon_chain::BeaconChainTypes; -use rand::prelude::IteratorRandom; use std::sync::Arc; use std::time::Duration; use types::blob_sidecar::FixedBlobSidecarList; diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index d218eb69918..48ec00dffa5 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -388,6 +388,11 @@ impl BlockLookups { }; if !delay_send { + R::request_state_mut(lookup) + .get_state_mut() + .on_download_success() + .map_err(LookupRequestError::BadState)?; + self.send_block_for_processing( block_root, block, @@ -411,7 +416,7 @@ impl BlockLookups { CachedChild::NotRequired => { R::request_state_mut(lookup) .get_state_mut() - .into_processing() + .on_download_success() .map_err(LookupRequestError::BadState)?; R::send_reconstructed_for_processing( @@ -781,12 +786,11 @@ impl BlockLookups { cx: &mut SyncNetworkContext, lookup: &mut SingleBlockLookup, ) -> Result<(), LookupRequestError> { - let request_state = R::request_state_mut(lookup); - - request_state + R::request_state_mut(lookup) .get_state_mut() .on_processing_success() .map_err(LookupRequestError::BadState)?; + if lookup.both_components_processed() { lookup.penalize_blob_peer(cx); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 8e879620783..a68d07dfc3c 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -29,7 +29,6 @@ pub enum LookupRequestError { cannot_process: bool, }, NoPeers, - BadState(String), SendFailed(&'static str), BadState(String), } @@ -406,11 +405,6 @@ impl SingleLookupRequestState { } } - // TODO: Should not leak the enum state - pub fn get_state(&self) -> &State { - &self.state - } - pub fn is_current_req_counter(&self, req_counter: u32) -> bool { self.req_counter == req_counter } @@ -450,8 +444,18 @@ impl SingleLookupRequestState { self.state = State::AwaitingDownload; } - pub fn on_download_success(&mut self, peer_id: PeerId) { - self.state = State::Processing { peer_id }; + pub fn on_download_success(&mut self) -> Result<(), String> { + match &self.state { + State::Downloading { peer_id } => { + self.state = State::Processing { peer_id: *peer_id }; + Ok(()) + } + other => { + return Err(format!( + "request bad state, expected downloading got {other}" + )) + } + } } /// Registers a failure in processing a block. @@ -520,21 +524,6 @@ impl SingleLookupRequestState { self.used_peers.insert(peer_id); Some(peer_id) } - - pub fn into_processing(&mut self) -> Result<(), String> { - match self.state { - State::AwaitingDownload => { - return Err("request bad state, expected downloading got processing".to_owned()) - } - State::Downloading { peer_id } => { - self.state = State::Processing { peer_id }; - Ok(()) - } - State::Processing { .. } => { - return Err("request bad state, expected downloading got processing".to_owned()) - } - } - } } impl slog::Value for SingleBlockLookup { From 2ef22b3f63bf271ab6ebd373000bf7a99fb82b58 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 16 Apr 2024 00:25:53 +0900 Subject: [PATCH 4/7] Log report reason --- beacon_node/network/src/sync/network_context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 086dd35d718..390843b319e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -343,7 +343,7 @@ impl SyncNetworkContext { /// Reports to the scoring algorithm the behaviour of a peer. pub fn report_peer(&self, peer_id: PeerId, action: PeerAction, msg: &'static str) { - debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action); + debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action, "msg" => %msg); self.network_send .send(NetworkMessage::ReportPeer { peer_id, From 71cceee342092563e75dea9b2b5c5235f6ce56a7 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Tue, 16 Apr 2024 00:58:05 -0400 Subject: [PATCH 5/7] Some lints and bugfixes (#23) * fix lints * bug fixes --- beacon_node/network/src/sync/block_lookups/mod.rs | 5 ++++- .../src/sync/block_lookups/single_block_lookup.rs | 9 ++++----- beacon_node/network/src/sync/network_context.rs | 10 ++++++---- .../network/src/sync/network_context/requests.rs | 13 +++++++------ 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 48ec00dffa5..703c2af5d6d 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -392,7 +392,6 @@ impl BlockLookups { .get_state_mut() .on_download_success() .map_err(LookupRequestError::BadState)?; - self.send_block_for_processing( block_root, block, @@ -403,6 +402,10 @@ impl BlockLookups { } } CachedChild::DownloadIncomplete => { + R::request_state_mut(lookup) + .get_state_mut() + .on_download_success() + .map_err(LookupRequestError::BadState)?; // If this was the result of a block request, we can't determine if the block peer // did anything wrong. If we already had both a block and blobs response processed, // we should penalize the blobs peer because they did not provide all blobs on the diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index a68d07dfc3c..5b1edd71a29 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -75,6 +75,7 @@ impl SingleBlockLookup { /// the next parent. pub fn update_requested_parent_block(&mut self, block_root: Hash256) { self.block_request_state.requested_block_root = block_root; + self.blob_request_state.block_root = block_root; self.block_request_state.state.state = State::AwaitingDownload; self.blob_request_state.state.state = State::AwaitingDownload; self.child_components = Some(ChildComponents::empty(block_root)); @@ -450,11 +451,9 @@ impl SingleLookupRequestState { self.state = State::Processing { peer_id: *peer_id }; Ok(()) } - other => { - return Err(format!( - "request bad state, expected downloading got {other}" - )) - } + other => Err(format!( + "request bad state, expected downloading got {other}" + )), } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 390843b319e..779dde96ff1 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -51,6 +51,8 @@ pub enum RpcEvent { RPCError(RPCError), } +pub type RpcProcessingResult = Option>; + /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { /// The network channel to relay messages to the Network service. @@ -307,8 +309,8 @@ impl SyncNetworkContext { ); self.send_network_msg(NetworkMessage::SendRequest { - peer_id: peer_id, - request: Request::BlobsByRoot(request.into_request(&self.chain.spec)), + peer_id, + request: Request::BlobsByRoot(request.clone().into_request(&self.chain.spec)), request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), })?; @@ -426,7 +428,7 @@ impl SyncNetworkContext { &mut self, request_id: SingleLookupReqId, block: RpcEvent>>, - ) -> Option>, Duration), RPCError>> { + ) -> RpcProcessingResult>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { return None; }; @@ -459,7 +461,7 @@ impl SyncNetworkContext { &mut self, request_id: SingleLookupReqId, blob: RpcEvent>>, - ) -> Option, Duration), RPCError>> { + ) -> RpcProcessingResult> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { return None; }; diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 1cc0260e450..91876bf9c5d 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -50,28 +50,29 @@ impl ActiveBlocksByRootRequest { } } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub struct BlocksByRootSingleRequest(pub Hash256); impl BlocksByRootSingleRequest { - pub fn into_request(&self, spec: &ChainSpec) -> BlocksByRootRequest { + pub fn into_request(self, spec: &ChainSpec) -> BlocksByRootRequest { BlocksByRootRequest::new(vec![self.0], spec) } } +#[derive(Debug, Clone)] pub struct BlobsByRootSingleBlockRequest { pub block_root: Hash256, pub indices: Vec, } impl BlobsByRootSingleBlockRequest { - pub fn into_request(&self, spec: &ChainSpec) -> BlobsByRootRequest { + pub fn into_request(self, spec: &ChainSpec) -> BlobsByRootRequest { BlobsByRootRequest::new( self.indices - .iter() + .into_iter() .map(|index| BlobIdentifier { block_root: self.block_root, - index: *index, + index, }) .collect(), spec, @@ -136,7 +137,7 @@ impl ActiveBlobsByRootRequest { pub fn terminate(self) -> Option>>> { if self.resolved { - return None; + None } else { Some(self.blobs) } From 5dfbb3125dbea6fbd8911934a6fe6f71f0fcb4aa Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 16 Apr 2024 14:23:57 +0900 Subject: [PATCH 6/7] Fix tests --- beacon_node/network/src/sync/block_lookups/mod.rs | 8 +++----- .../network/src/sync/block_lookups/single_block_lookup.rs | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 703c2af5d6d..86109a38ac2 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -606,12 +606,9 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { /* Check disconnection for single lookups */ - self.single_block_lookups.retain(|id, req| { + self.single_block_lookups.retain(|_, req| { let should_drop_lookup = req.should_drop_lookup_on_disconnected_peer(peer_id, cx, &self.log); - if should_drop_lookup { - debug!(self.log, "Dropping lookup after peer disconnected"; "id" => id, "block_root" => %req.block_root()); - } !should_drop_lookup }); @@ -695,7 +692,8 @@ impl BlockLookups { "error" => ?e, "block_root" => ?block_root, ); - self.single_block_lookups.remove(&id); + } else { + self.single_block_lookups.insert(id, lookup); } metrics::set_gauge( diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 5b1edd71a29..5bb663967d7 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -11,7 +11,7 @@ use beacon_chain::BeaconChainTypes; use itertools::Itertools; use lighthouse_network::PeerAction; use rand::seq::IteratorRandom; -use slog::{trace, Logger}; +use slog::{debug, Logger}; use std::collections::HashSet; use std::fmt::Debug; use std::marker::PhantomData; @@ -221,7 +221,7 @@ impl SingleBlockLookup { if block_peer_disconnected || blob_peer_disconnected { if let Err(e) = self.request_block_and_blobs(cx) { - trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?block_root, "error" => ?e); + debug!(log, "Single lookup failed on peer disconnection"; "block_root" => ?block_root, "error" => ?e); return true; } } From 2d4712b9feddc900bab015af0603ca4c0c41a004 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Mon, 22 Apr 2024 09:24:15 -0400 Subject: [PATCH 7/7] Pr 5583 review (#24) * add bad state warn log * add rust docs to new fields in `SyncNetworkContext` * remove timestamp todo * add back lookup verify error * remove TODOs --- .../network/src/sync/block_lookups/mod.rs | 50 +++++++--------- .../network/src/sync/block_lookups/tests.rs | 2 +- .../network/src/sync/network_context.rs | 57 +++++++++++++------ .../src/sync/network_context/requests.rs | 42 +++++++------- 4 files changed, 84 insertions(+), 67 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 998bcc855a5..fa2683fb0f0 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,5 +1,6 @@ use self::single_block_lookup::SingleBlockLookup; use super::manager::BlockProcessingResult; +use super::network_context::{LookupFailure, LookupVerifyError}; use super::BatchProcessResult; use super::{manager::BlockProcessType, network_context::SyncNetworkContext}; use crate::metrics; @@ -20,7 +21,6 @@ pub use common::Lookup; pub use common::Parent; pub use common::RequestState; use fnv::FnvHashMap; -use lighthouse_network::rpc::RPCError; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState}; @@ -604,7 +604,7 @@ impl BlockLookups { // processed. Drop the request without extra penalty } RequestError::BadState(..) => { - // Internal error should never happen + warn!(self.log, "Failed to request parent"; "error" => e.as_static()); } } } @@ -638,10 +638,13 @@ impl BlockLookups { id: SingleLookupReqId, peer_id: &PeerId, cx: &mut SyncNetworkContext, - error: RPCError, + error: LookupFailure, ) { - // Downscore peer even if lookup is not known - self.downscore_on_rpc_error(peer_id, &error, cx); + // Only downscore lookup verify errors. RPC errors are downscored in the network handler. + if let LookupFailure::LookupVerifyError(e) = &error { + // Downscore peer even if lookup is not known + self.downscore_on_rpc_error(peer_id, e, cx); + } let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { debug!(self.log, @@ -671,14 +674,17 @@ impl BlockLookups { id: SingleLookupReqId, peer_id: &PeerId, cx: &mut SyncNetworkContext, - error: RPCError, + error: LookupFailure, ) { - // Downscore peer even if lookup is not known - self.downscore_on_rpc_error(peer_id, &error, cx); + // Only downscore lookup verify errors. RPC errors are downscored in the network handler. + if let LookupFailure::LookupVerifyError(e) = &error { + // Downscore peer even if lookup is not known + self.downscore_on_rpc_error(peer_id, e, cx); + } let log = self.log.clone(); let Some(mut lookup) = self.get_single_lookup::(id) else { - debug!(log, "Error response to dropped lookup"; "error" => ?error); + debug!(log, "Error response to dropped lookup"; "error" => %error); return; }; let block_root = lookup.block_root(); @@ -1322,31 +1328,15 @@ impl BlockLookups { pub fn downscore_on_rpc_error( &self, peer_id: &PeerId, - error: &RPCError, + error: &LookupVerifyError, cx: &SyncNetworkContext, ) { // Note: logging the report event here with the full error display. The log inside // `report_peer` only includes a smaller string, like "invalid_data" - debug!(self.log, "reporting peer for sync lookup error"; "error" => %error); - if let Some(action) = match error { - // Protocol errors are heavily penalized - RPCError::SSZDecodeError(..) - | RPCError::IoError(..) - | RPCError::ErrorResponse(..) - | RPCError::InvalidData(..) - | RPCError::HandlerRejected => Some(PeerAction::LowToleranceError), - // Timing / network errors are less penalized - // TODO: Is IoError a protocol error or network error? - RPCError::StreamTimeout | RPCError::IncompleteStream | RPCError::NegotiationTimeout => { - Some(PeerAction::MidToleranceError) - } - // Not supporting a specific protocol is tolerated. TODO: Are you sure? - RPCError::UnsupportedProtocol => None, - // Our fault, don't penalize peer - RPCError::InternalError(..) | RPCError::Disconnected => None, - } { - cx.report_peer(*peer_id, action, error.into()); - } + let error_str: &'static str = error.into(); + + debug!(self.log, "reporting peer for sync lookup error"; "error" => error_str); + cx.report_peer(*peer_id, PeerAction::LowToleranceError, error_str); } pub fn update_metrics(&self) { diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index fc6ac28bdc7..8e3b35ee5d3 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -15,7 +15,7 @@ use beacon_chain::test_utils::{ build_log, generate_rand_block_and_blobs, BeaconChainHarness, EphemeralHarnessType, NumBlobs, }; use beacon_processor::WorkEvent; -use lighthouse_network::rpc::RPCResponseErrorCode; +use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; use lighthouse_network::types::SyncState; use lighthouse_network::{NetworkGlobals, Request}; use slog::info; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 779dde96ff1..fc91270c1dc 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -17,6 +17,7 @@ use fnv::FnvHashMap; use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; +pub use requests::LookupVerifyError; use slog::{debug, trace, warn}; use std::collections::hash_map::Entry; use std::sync::Arc; @@ -51,7 +52,33 @@ pub enum RpcEvent { RPCError(RPCError), } -pub type RpcProcessingResult = Option>; +pub type RpcProcessingResult = Option>; + +pub enum LookupFailure { + RpcError(RPCError), + LookupVerifyError(LookupVerifyError), +} + +impl std::fmt::Display for LookupFailure { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + LookupFailure::RpcError(e) => write!(f, "RPC Error: {:?}", e), + LookupFailure::LookupVerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), + } + } +} + +impl From for LookupFailure { + fn from(e: RPCError) -> Self { + LookupFailure::RpcError(e) + } +} + +impl From for LookupFailure { + fn from(e: LookupVerifyError) -> Self { + LookupFailure::LookupVerifyError(e) + } +} /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { @@ -61,7 +88,10 @@ pub struct SyncNetworkContext { /// A sequential ID for all RPC requests. request_id: Id, + /// A mapping of active BlocksByRoot requests, including both current slot and parent lookups. blocks_by_root_requests: FnvHashMap, + + /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. blobs_by_root_requests: FnvHashMap>, /// BlocksByRange requests paired with BlobsByRange @@ -439,20 +469,18 @@ impl SyncNetworkContext { Ok(block) => Ok((block, seen_timestamp)), Err(e) => { // The request must be dropped after receiving an error. - // TODO: We could NOT drop the request here, and penalize the peer again if - // sends multiple penalizable chunks after the first invalid. request.remove(); - Err(e) + Err(e.into()) } } } RpcEvent::StreamTermination => match request.remove().terminate() { Ok(_) => return None, - Err(e) => Err(e), + Err(e) => Err(e.into()), }, RpcEvent::RPCError(e) => { request.remove(); - Err(e) + Err(e.into()) } }) } @@ -468,32 +496,27 @@ impl SyncNetworkContext { Some(match blob { RpcEvent::Response(blob, _) => match request.get_mut().add_response(blob) { - // TODO: Should deal only with Vec> Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) .map(|blobs| (blobs, timestamp_now())) - .map_err(RPCError::InvalidData), + .map_err(Into::into), Ok(None) => return None, Err(e) => { request.remove(); - Err(e) + Err(e.into()) } }, RpcEvent::StreamTermination => { // Stream terminator match request.remove().terminate() { - // TODO: Should deal only with Vec> Some(blobs) => to_fixed_blob_sidecar_list(blobs) - // TODO: a seen_timestamp for an array of blobs doesn't make much sense - // since each is received at different times. Should we track first, last or - // average? .map(|blobs| (blobs, timestamp_now())) - .map_err(RPCError::InvalidData), + .map_err(Into::into), None => return None, } } RpcEvent::RPCError(e) => { request.remove(); - Err(e) + Err(e.into()) } }) } @@ -501,13 +524,13 @@ impl SyncNetworkContext { fn to_fixed_blob_sidecar_list( blobs: Vec>>, -) -> Result, String> { +) -> Result, LookupVerifyError> { let mut fixed_list = FixedBlobSidecarList::default(); for blob in blobs.into_iter() { let index = blob.index as usize; *fixed_list .get_mut(index) - .ok_or("invalid index".to_string())? = Some(blob) + .ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob) } Ok(fixed_list) } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 91876bf9c5d..0522b7fa384 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,10 +1,21 @@ use beacon_chain::get_block_root; -use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest, RPCError}; +use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest}; use std::sync::Arc; +use strum::IntoStaticStr; use types::{ blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, }; +#[derive(Debug, PartialEq, Eq, IntoStaticStr)] +pub enum LookupVerifyError { + NoResponseReturned, + TooManyResponses, + UnrequestedBlockRoot(Hash256), + UnrequestedBlobIndex(u64), + InvalidInclusionProof, + DuplicateData, +} + pub struct ActiveBlocksByRootRequest { request: BlocksByRootSingleRequest, resolved: bool, @@ -24,16 +35,14 @@ impl ActiveBlocksByRootRequest { pub fn add_response( &mut self, block: Arc>, - ) -> Result>, RPCError> { + ) -> Result>, LookupVerifyError> { if self.resolved { - return Err(RPCError::InvalidData("too many responses".to_string())); + return Err(LookupVerifyError::TooManyResponses); } let block_root = get_block_root(&block); if self.request.0 != block_root { - return Err(RPCError::InvalidData(format!( - "un-requested block root {block_root:?}" - ))); + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); } // Valid data, blocks by root expects a single response @@ -41,11 +50,11 @@ impl ActiveBlocksByRootRequest { Ok(block) } - pub fn terminate(self) -> Result<(), RPCError> { + pub fn terminate(self) -> Result<(), LookupVerifyError> { if self.resolved { Ok(()) } else { - Err(RPCError::InvalidData("no response returned".to_string())) + Err(LookupVerifyError::NoResponseReturned) } } } @@ -101,28 +110,23 @@ impl ActiveBlobsByRootRequest { pub fn add_response( &mut self, blob: Arc>, - ) -> Result>>>, RPCError> { + ) -> Result>>>, LookupVerifyError> { if self.resolved { - return Err(RPCError::InvalidData("too many responses".to_string())); + return Err(LookupVerifyError::TooManyResponses); } let block_root = blob.block_root(); if self.request.block_root != block_root { - return Err(RPCError::InvalidData(format!( - "un-requested block root {block_root:?}" - ))); + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); } if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - return Err(RPCError::InvalidData("invalid inclusion proof".to_string())); + return Err(LookupVerifyError::InvalidInclusionProof); } if !self.request.indices.contains(&blob.index) { - return Err(RPCError::InvalidData(format!( - "un-requested blob index {}", - blob.index - ))); + return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index)); } if self.blobs.iter().any(|b| b.index == blob.index) { - return Err(RPCError::InvalidData("duplicated data".to_string())); + return Err(LookupVerifyError::DuplicateData); } self.blobs.push(blob);