diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 8f7881eea8a..3bd39301b21 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -1,22 +1,21 @@ use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE; use crate::sync::block_lookups::single_block_lookup::{ - LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State, + LookupRequestError, SingleBlockLookup, SingleLookupRequestState, }; use crate::sync::block_lookups::{ BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS, }; use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId}; -use crate::sync::network_context::SyncNetworkContext; +use crate::sync::network_context::{ + BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, SyncNetworkContext, +}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::ChildComponents; -use beacon_chain::{get_block_root, BeaconChainTypes}; -use lighthouse_network::rpc::methods::BlobsByRootRequest; -use lighthouse_network::rpc::BlocksByRootRequest; -use std::ops::IndexMut; +use beacon_chain::BeaconChainTypes; use std::sync::Arc; use std::time::Duration; -use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; -use types::{BlobSidecar, ChainSpec, Hash256, SignedBeaconBlock}; +use types::blob_sidecar::FixedBlobSidecarList; +use types::{Hash256, SignedBeaconBlock}; #[derive(Debug, Copy, Clone)] pub enum ResponseType { @@ -73,9 +72,6 @@ pub trait RequestState { /// The type of the request . type RequestType; - /// A block or blob response. - type ResponseType; - /// The type created after validation. type VerifiedResponseType: Clone; @@ -85,14 +81,11 @@ pub trait RequestState { /* Request building methods */ /// Construct a new request. - fn build_request( - &mut self, - spec: &ChainSpec, - ) -> Result<(PeerId, Self::RequestType), LookupRequestError> { + fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> { // Verify and construct request. self.too_many_attempts()?; let peer = self.get_peer()?; - let request = self.new_request(spec); + let request = self.new_request(); Ok((peer, request)) } @@ -100,7 +93,7 @@ pub trait RequestState { fn build_request_and_send( &mut self, id: Id, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { // Check if request is necessary. if !self.get_state().is_awaiting_download() { @@ -108,7 +101,7 @@ pub trait RequestState { } // Construct request. - let (peer_id, request) = self.build_request(&cx.chain.spec)?; + let (peer_id, request) = self.build_request()?; // Update request state. let req_counter = self.get_state_mut().on_download_start(peer_id); @@ -144,61 +137,18 @@ pub trait RequestState { } /// Initialize `Self::RequestType`. - fn new_request(&self, spec: &ChainSpec) -> Self::RequestType; + fn new_request(&self) -> Self::RequestType; /// Send the request to the network service. fn make_request( id: SingleLookupReqId, peer_id: PeerId, request: Self::RequestType, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError>; /* Response handling methods */ - /// Verify the response is valid based on what we requested. - fn verify_response( - &mut self, - expected_block_root: Hash256, - peer_id: PeerId, - response: Option, - ) -> Result, LookupVerifyError> { - let result = match *self.get_state().get_state() { - State::AwaitingDownload => Err(LookupVerifyError::ExtraBlocksReturned), - State::Downloading { peer_id: _ } => { - // TODO: We requested a download from Downloading { peer_id }, but the network - // injects a response from a different peer_id. What should we do? The peer_id to - // track for scoring is the one that actually sent the response, not the state's - self.verify_response_inner(expected_block_root, response) - } - State::Processing { .. } | State::Processed { .. } => match response { - // We sent the block for processing and received an extra block. - Some(_) => Err(LookupVerifyError::ExtraBlocksReturned), - // This is simply the stream termination and we are already processing the block - None => Ok(None), - }, - }; - - match result { - Ok(Some(response)) => { - self.get_state_mut().on_download_success(peer_id); - Ok(Some(response)) - } - Ok(None) => Ok(None), - Err(e) => { - self.get_state_mut().on_download_failure(); - Err(e) - } - } - } - - /// The response verification unique to block or blobs. - fn verify_response_inner( - &mut self, - expected_block_root: Hash256, - response: Option, - ) -> Result, LookupVerifyError>; - /// A getter for the parent root of the response. Returns an `Option` because we won't know /// the blob parent if we don't end up getting any blobs in the response. fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option; @@ -247,49 +197,24 @@ pub trait RequestState { } impl RequestState for BlockRequestState { - type RequestType = BlocksByRootRequest; - type ResponseType = Arc>; + type RequestType = BlocksByRootSingleRequest; type VerifiedResponseType = Arc>; type ReconstructedResponseType = RpcBlock; - fn new_request(&self, spec: &ChainSpec) -> BlocksByRootRequest { - BlocksByRootRequest::new(vec![self.requested_block_root], spec) + fn new_request(&self) -> Self::RequestType { + BlocksByRootSingleRequest(self.requested_block_root) } fn make_request( id: SingleLookupReqId, peer_id: PeerId, request: Self::RequestType, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { cx.block_lookup_request(id, peer_id, request) .map_err(LookupRequestError::SendFailed) } - fn verify_response_inner( - &mut self, - expected_block_root: Hash256, - response: Option, - ) -> Result>>, LookupVerifyError> { - match response { - Some(block) => { - // Compute the block root using this specific function so that we can get timing - // metrics. - let block_root = get_block_root(&block); - if block_root != expected_block_root { - // return an error and drop the block - // NOTE: we take this is as a download failure to prevent counting the - // attempt as a chain failure, but simply a peer failure. - Err(LookupVerifyError::RootMismatch) - } else { - // Return the block for processing. - Ok(Some(block)) - } - } - None => Err(LookupVerifyError::NoBlockReturned), - } - } - fn get_parent_root(verified_response: &Arc>) -> Option { Some(verified_response.parent_root()) } @@ -340,60 +265,27 @@ impl RequestState for BlockRequestState } impl RequestState for BlobRequestState { - type RequestType = BlobsByRootRequest; - type ResponseType = Arc>; + type RequestType = BlobsByRootSingleBlockRequest; type VerifiedResponseType = FixedBlobSidecarList; type ReconstructedResponseType = FixedBlobSidecarList; - fn new_request(&self, spec: &ChainSpec) -> BlobsByRootRequest { - let blob_id_vec: Vec = self.requested_ids.clone().into(); - BlobsByRootRequest::new(blob_id_vec, spec) + fn new_request(&self) -> Self::RequestType { + BlobsByRootSingleBlockRequest { + block_root: self.block_root, + indices: self.requested_ids.indices(), + } } fn make_request( id: SingleLookupReqId, peer_id: PeerId, request: Self::RequestType, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { cx.blob_lookup_request(id, peer_id, request) .map_err(LookupRequestError::SendFailed) } - fn verify_response_inner( - &mut self, - expected_block_root: Hash256, - blob: Option, - ) -> Result>, LookupVerifyError> { - match blob { - Some(blob) => { - let received_id = blob.id(); - - if !self.requested_ids.contains(&received_id) { - return Err(LookupVerifyError::UnrequestedBlobId(received_id)); - } - if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - return Err(LookupVerifyError::InvalidInclusionProof); - } - if blob.block_root() != expected_block_root { - return Err(LookupVerifyError::UnrequestedHeader); - } - - // State should remain downloading until we receive the stream terminator. - self.requested_ids.remove(&received_id); - - // The inclusion proof check above ensures `blob.index` is < MAX_BLOBS_PER_BLOCK - let blob_index = blob.index; - *self.blob_download_queue.index_mut(blob_index as usize) = Some(blob); - Ok(None) - } - None => { - let blobs = std::mem::take(&mut self.blob_download_queue); - Ok(Some(blobs)) - } - } - } - fn get_parent_root(verified_response: &FixedBlobSidecarList) -> Option { verified_response .into_iter() diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a5826bcb3d8..fa2683fb0f0 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,6 +1,6 @@ -use self::parent_lookup::ParentVerifyError; use self::single_block_lookup::SingleBlockLookup; use super::manager::BlockProcessingResult; +use super::network_context::{LookupFailure, LookupVerifyError}; use super::BatchProcessResult; use super::{manager::BlockProcessType, network_context::SyncNetworkContext}; use crate::metrics; @@ -21,7 +21,6 @@ pub use common::Lookup; pub use common::Parent; pub use common::RequestState; use fnv::FnvHashMap; -use lighthouse_network::rpc::RPCError; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState}; @@ -133,7 +132,7 @@ impl BlockLookups { pub fn trigger_single_lookup( &mut self, mut single_block_lookup: SingleBlockLookup, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let block_root = single_block_lookup.block_root(); match single_block_lookup.request_block_and_blobs(cx) { @@ -319,40 +318,41 @@ impl BlockLookups { &mut self, lookup_id: SingleLookupReqId, peer_id: PeerId, - response: Option, + response: R::VerifiedResponseType, seen_timestamp: Duration, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let id = lookup_id.id; let response_type = R::response_type(); - let Some(lookup) = self.get_single_lookup::(lookup_id) else { - if response.is_some() { - // We don't have the ability to cancel in-flight RPC requests. So this can happen - // if we started this RPC request, and later saw the block/blobs via gossip. - debug!( - self.log, - "Block returned for single block lookup not present"; - "response_type" => ?response_type, - ); - } + let Some(mut lookup) = self.get_single_lookup::(lookup_id) else { + // We don't have the ability to cancel in-flight RPC requests. So this can happen + // if we started this RPC request, and later saw the block/blobs via gossip. + debug!( + self.log, + "Block returned for single block lookup not present"; + "response_type" => ?response_type, + ); return; }; let expected_block_root = lookup.block_root(); - if response.is_some() { - debug!(self.log, - "Peer returned response for single lookup"; - "peer_id" => %peer_id , - "id" => ?id, - "block_root" => ?expected_block_root, - "response_type" => ?response_type, - ); - } + debug!(self.log, + "Peer returned response for single lookup"; + "peer_id" => %peer_id , + "id" => ?id, + "block_root" => ?expected_block_root, + "response_type" => ?response_type, + ); - match self.single_lookup_response_inner::(peer_id, response, seen_timestamp, cx, lookup) - { - Ok(lookup) => { + match self.handle_verified_response::( + seen_timestamp, + cx, + BlockProcessType::SingleBlock { id: lookup.id }, + response, + &mut lookup, + ) { + Ok(_) => { self.single_block_lookups.insert(id, lookup); } Err(e) => { @@ -372,53 +372,10 @@ impl BlockLookups { /// Consolidates error handling for `single_lookup_response`. An `Err` here should always mean /// the lookup is dropped. - fn single_lookup_response_inner>( - &self, - peer_id: PeerId, - response: Option, - seen_timestamp: Duration, - cx: &SyncNetworkContext, - mut lookup: SingleBlockLookup, - ) -> Result, LookupRequestError> { - let response_type = R::response_type(); - let log = self.log.clone(); - let expected_block_root = lookup.block_root(); - let request_state = R::request_state_mut(&mut lookup); - - match request_state.verify_response(expected_block_root, peer_id, response) { - Ok(Some(verified_response)) => { - self.handle_verified_response::( - seen_timestamp, - cx, - BlockProcessType::SingleBlock { id: lookup.id }, - verified_response, - &mut lookup, - )?; - } - Ok(None) => {} - Err(e) => { - debug!( - log, - "Single lookup response verification failed, retrying"; - "block_root" => ?expected_block_root, - "peer_id" => %peer_id, - "response_type" => ?response_type, - "error" => ?e - ); - let msg = e.into(); - cx.report_peer(peer_id, PeerAction::LowToleranceError, msg); - - request_state.register_failure_downloading(); - lookup.request_block_and_blobs(cx)?; - } - } - Ok(lookup) - } - fn handle_verified_response>( &self, seen_timestamp: Duration, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, process_type: BlockProcessType, verified_response: R::VerifiedResponseType, lookup: &mut SingleBlockLookup, @@ -438,6 +395,10 @@ impl BlockLookups { }; if !delay_send { + R::request_state_mut(lookup) + .get_state_mut() + .on_download_success() + .map_err(LookupRequestError::BadState)?; self.send_block_for_processing( block_root, block, @@ -448,6 +409,10 @@ impl BlockLookups { } } CachedChild::DownloadIncomplete => { + R::request_state_mut(lookup) + .get_state_mut() + .on_download_success() + .map_err(LookupRequestError::BadState)?; // If this was the result of a block request, we can't determine if the block peer // did anything wrong. If we already had both a block and blobs response processed, // we should penalize the blobs peer because they did not provide all blobs on the @@ -458,14 +423,21 @@ impl BlockLookups { } lookup.request_block_and_blobs(cx)?; } - CachedChild::NotRequired => R::send_reconstructed_for_processing( - id, - self, - block_root, - R::verified_to_reconstructed(block_root, verified_response), - seen_timestamp, - cx, - )?, + CachedChild::NotRequired => { + R::request_state_mut(lookup) + .get_state_mut() + .on_download_success() + .map_err(LookupRequestError::BadState)?; + + R::send_reconstructed_for_processing( + id, + self, + block_root, + R::verified_to_reconstructed(block_root, verified_response), + seen_timestamp, + cx, + )? + } CachedChild::Err(e) => { warn!(self.log, "Consistency error in cached block"; "error" => ?e, @@ -511,26 +483,22 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: PeerId, - response: Option, + response: R::VerifiedResponseType, seen_timestamp: Duration, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { - if response.is_some() { - debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id); - } + debug!(self.log, "Response for a parent lookup request that was not found"; "peer_id" => %peer_id); return; }; - if response.is_some() { - debug!(self.log, - "Peer returned response for parent lookup"; - "peer_id" => %peer_id , - "id" => ?id, - "block_root" => ?parent_lookup.current_parent_request.block_request_state.requested_block_root, - "response_type" => ?R::response_type(), - ); - } + debug!(self.log, + "Peer returned response for parent lookup"; + "peer_id" => %peer_id , + "id" => ?id, + "block_root" => ?parent_lookup.current_parent_request.block_request_state.requested_block_root, + "response_type" => ?R::response_type(), + ); match self.parent_lookup_response_inner::( peer_id, @@ -558,59 +526,17 @@ impl BlockLookups { fn parent_lookup_response_inner>( &mut self, peer_id: PeerId, - response: Option, + response: R::VerifiedResponseType, seen_timestamp: Duration, - cx: &SyncNetworkContext, - parent_lookup: &mut ParentLookup, - ) -> Result<(), RequestError> { - match parent_lookup.verify_response::(peer_id, response, &mut self.failed_chains) { - Ok(Some(verified_response)) => { - self.handle_verified_response::( - seen_timestamp, - cx, - BlockProcessType::ParentLookup { - chain_hash: parent_lookup.chain_hash(), - }, - verified_response, - &mut parent_lookup.current_parent_request, - )?; - } - Ok(None) => {} - Err(e) => self.handle_parent_verify_error::(peer_id, parent_lookup, e, cx)?, - }; - Ok(()) - } - - /// Handle logging and peer scoring for `ParentVerifyError`s during parent lookup requests. - fn handle_parent_verify_error>( - &mut self, - peer_id: PeerId, + cx: &mut SyncNetworkContext, parent_lookup: &mut ParentLookup, - e: ParentVerifyError, - cx: &SyncNetworkContext, ) -> Result<(), RequestError> { - match e { - ParentVerifyError::RootMismatch - | ParentVerifyError::NoBlockReturned - | ParentVerifyError::NotEnoughBlobsReturned - | ParentVerifyError::ExtraBlocksReturned - | ParentVerifyError::UnrequestedBlobId(_) - | ParentVerifyError::InvalidInclusionProof - | ParentVerifyError::UnrequestedHeader - | ParentVerifyError::ExtraBlobsReturned - | ParentVerifyError::InvalidIndex(_) => { - let e = e.into(); - warn!(self.log, "Peer sent invalid response to parent request"; - "peer_id" => %peer_id, "reason" => %e); - - // We do not tolerate these kinds of errors. We will accept a few but these are signs - // of a faulty peer. - cx.report_peer(peer_id, PeerAction::LowToleranceError, e); - - // We try again if possible. - parent_lookup.request_parent(cx)?; - } - ParentVerifyError::PreviousFailure { parent_root } => { + // check if the parent of this block isn't in the failed cache. If it is, this chain should + // be dropped and the peer downscored. + if let Some(parent_root) = R::get_parent_root(&response) { + if self.failed_chains.contains(&parent_root) { + let request_state = R::request_state_mut(&mut parent_lookup.current_parent_request); + request_state.register_failure_downloading(); debug!( self.log, "Parent chain ignored due to past failure"; @@ -624,8 +550,20 @@ impl BlockLookups { PeerAction::MidToleranceError, "bbroot_failed_chains", ); + return Ok(()); } } + + self.handle_verified_response::( + seen_timestamp, + cx, + BlockProcessType::ParentLookup { + chain_hash: parent_lookup.chain_hash(), + }, + response, + &mut parent_lookup.current_parent_request, + )?; + Ok(()) } @@ -665,8 +603,8 @@ impl BlockLookups { // This happens if the peer disconnects while the block is being // processed. Drop the request without extra penalty } - RequestError::BadState(_) => { - // Should never happen + RequestError::BadState(..) => { + warn!(self.log, "Failed to request parent"; "error" => e.as_static()); } } } @@ -675,12 +613,9 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { /* Check disconnection for single lookups */ - self.single_block_lookups.retain(|id, req| { + self.single_block_lookups.retain(|_, req| { let should_drop_lookup = req.should_drop_lookup_on_disconnected_peer(peer_id, cx, &self.log); - if should_drop_lookup { - debug!(self.log, "Dropping lookup after peer disconnected"; "id" => id, "block_root" => %req.block_root()); - } !should_drop_lookup }); @@ -702,21 +637,28 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: &PeerId, - cx: &SyncNetworkContext, - error: RPCError, + cx: &mut SyncNetworkContext, + error: LookupFailure, ) { - let msg = error.as_static_str(); + // Only downscore lookup verify errors. RPC errors are downscored in the network handler. + if let LookupFailure::LookupVerifyError(e) = &error { + // Downscore peer even if lookup is not known + self.downscore_on_rpc_error(peer_id, e, cx); + } + let Some(mut parent_lookup) = self.get_parent_lookup::(id) else { debug!(self.log, "RPC failure for a block parent lookup request that was not found"; "peer_id" => %peer_id, - "error" => msg + "error" => %error ); return; }; R::request_state_mut(&mut parent_lookup.current_parent_request) .register_failure_downloading(); - trace!(self.log, "Parent lookup block request failed"; &parent_lookup, "error" => msg); + debug!(self.log, "Parent lookup block request failed"; + "chain_hash" => %parent_lookup.chain_hash(), "id" => ?id, "error" => %error + ); self.request_parent(parent_lookup, cx); @@ -731,13 +673,18 @@ impl BlockLookups { &mut self, id: SingleLookupReqId, peer_id: &PeerId, - cx: &SyncNetworkContext, - error: RPCError, + cx: &mut SyncNetworkContext, + error: LookupFailure, ) { - let msg = error.as_static_str(); + // Only downscore lookup verify errors. RPC errors are downscored in the network handler. + if let LookupFailure::LookupVerifyError(e) = &error { + // Downscore peer even if lookup is not known + self.downscore_on_rpc_error(peer_id, e, cx); + } + let log = self.log.clone(); let Some(mut lookup) = self.get_single_lookup::(id) else { - debug!(log, "Error response to dropped lookup"; "error" => ?error); + debug!(log, "Error response to dropped lookup"; "error" => %error); return; }; let block_root = lookup.block_root(); @@ -746,7 +693,7 @@ impl BlockLookups { trace!(log, "Single lookup failed"; "block_root" => ?block_root, - "error" => msg, + "error" => %error, "peer_id" => %peer_id, "response_type" => ?response_type ); @@ -758,7 +705,8 @@ impl BlockLookups { "error" => ?e, "block_root" => ?block_root, ); - self.single_block_lookups.remove(&id); + } else { + self.single_block_lookups.insert(id, lookup); } metrics::set_gauge( @@ -1006,20 +954,21 @@ impl BlockLookups { } BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) | BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown(_)) => { + let (chain_hash, blocks, hashes, block_request) = + parent_lookup.parts_for_processing(); + + let blocks = self.add_child_block_to_chain(chain_hash, blocks, cx).into(); + + let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); + // Check if the beacon processor is available let Some(beacon_processor) = cx.beacon_processor_if_enabled() else { return trace!( self.log, "Dropping parent chain segment that was ready for processing."; - parent_lookup + "chain_hash" => %chain_hash, ); }; - let (chain_hash, blocks, hashes, block_request) = - parent_lookup.parts_for_processing(); - - let blocks = self.add_child_block_to_chain(chain_hash, blocks, cx).into(); - - let process_id = ChainSegmentProcessId::ParentLookup(chain_hash); match beacon_processor.send_chain_segment(process_id, blocks) { Ok(_) => { @@ -1073,7 +1022,7 @@ impl BlockLookups { &mut self, chain_hash: Hash256, mut blocks: VecDeque>, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> VecDeque> { // Find the child block that spawned the parent lookup request and add it to the chain // to send for processing. @@ -1126,12 +1075,16 @@ impl BlockLookups { fn handle_parent_block_error( &mut self, outcome: BlockError<::EthSpec>, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, mut parent_lookup: ParentLookup, ) { // We should always have a block peer. - let Ok(block_peer_id) = parent_lookup.block_processing_peer() else { - return; + let block_peer_id = match parent_lookup.block_processing_peer() { + Ok(peer_id) => peer_id, + Err(e) => { + warn!(self.log, "Parent lookup in bad state"; "chain_hash" => %parent_lookup.chain_hash(), "error" => e); + return; + } }; // We may not have a blob peer, if there were no blobs required for this block. @@ -1178,7 +1131,7 @@ impl BlockLookups { &mut self, chain_hash: Hash256, result: BatchProcessResult, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) { let Some((_hashes, request)) = self.processing_parent_lookups.remove(&chain_hash) else { return debug!(self.log, "Chain process response for a parent lookup request that was not found"; "chain_hash" => %chain_hash, "result" => ?result); @@ -1339,7 +1292,11 @@ impl BlockLookups { /// Attempts to request the next unknown parent. This method handles peer scoring and dropping /// the lookup in the event of failure. - fn request_parent(&mut self, mut parent_lookup: ParentLookup, cx: &SyncNetworkContext) { + fn request_parent( + &mut self, + mut parent_lookup: ParentLookup, + cx: &mut SyncNetworkContext, + ) { let response = parent_lookup.request_parent(cx); match response { @@ -1368,6 +1325,20 @@ impl BlockLookups { self.parent_lookups.drain(..).len() } + pub fn downscore_on_rpc_error( + &self, + peer_id: &PeerId, + error: &LookupVerifyError, + cx: &SyncNetworkContext, + ) { + // Note: logging the report event here with the full error display. The log inside + // `report_peer` only includes a smaller string, like "invalid_data" + let error_str: &'static str = error.into(); + + debug!(self.log, "reporting peer for sync lookup error"; "error" => error_str); + cx.report_peer(*peer_id, PeerAction::LowToleranceError, error_str); + } + pub fn update_metrics(&self) { metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index 55dd26b661e..b7a71860bff 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,7 +1,6 @@ -use super::single_block_lookup::{LookupRequestError, LookupVerifyError, SingleBlockLookup}; +use super::single_block_lookup::{LookupRequestError, SingleBlockLookup}; use super::{DownloadedBlock, PeerId}; use crate::sync::block_lookups::common::Parent; -use crate::sync::block_lookups::common::RequestState; use crate::sync::{manager::SLOT_IMPORT_TOLERANCE, network_context::SyncNetworkContext}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; @@ -10,8 +9,6 @@ use beacon_chain::BeaconChainTypes; use std::collections::VecDeque; use std::sync::Arc; use store::Hash256; -use strum::IntoStaticStr; -use types::blob_sidecar::BlobIdentifier; /// How many attempts we try to find a parent of a block before we give up trying. pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5; @@ -30,22 +27,8 @@ pub(crate) struct ParentLookup { pub current_parent_request: SingleBlockLookup, } -#[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum ParentVerifyError { - RootMismatch, - NoBlockReturned, - NotEnoughBlobsReturned, - ExtraBlocksReturned, - UnrequestedBlobId(BlobIdentifier), - InvalidInclusionProof, - UnrequestedHeader, - ExtraBlobsReturned, - InvalidIndex(u64), - PreviousFailure { parent_root: Hash256 }, -} - #[derive(Debug, PartialEq, Eq)] -pub enum RequestError { +pub(crate) enum RequestError { SendFailed(&'static str), ChainTooLong, /// We witnessed too many failures trying to complete this parent lookup. @@ -92,7 +75,7 @@ impl ParentLookup { } /// Attempts to request the next unknown parent. If the request fails, it should be removed. - pub fn request_parent(&mut self, cx: &SyncNetworkContext) -> Result<(), RequestError> { + pub fn request_parent(&mut self, cx: &mut SyncNetworkContext) -> Result<(), RequestError> { // check to make sure this request hasn't failed if self.downloaded_blocks.len() + 1 >= PARENT_DEPTH_TOLERANCE { return Err(RequestError::ChainTooLong); @@ -186,34 +169,6 @@ impl ParentLookup { } } - /// Verifies that the received block is what we requested. If so, parent lookup now waits for - /// the processing result of the block. - pub fn verify_response>( - &mut self, - peer_id: PeerId, - block: Option, - failed_chains: &mut lru_cache::LRUTimeCache, - ) -> Result, ParentVerifyError> { - let expected_block_root = self.current_parent_request.block_root(); - let request_state = R::request_state_mut(&mut self.current_parent_request); - let root_and_verified = - request_state.verify_response(expected_block_root, peer_id, block)?; - - // check if the parent of this block isn't in the failed cache. If it is, this chain should - // be dropped and the peer downscored. - if let Some(parent_root) = root_and_verified - .as_ref() - .and_then(|block| R::get_parent_root(block)) - { - if failed_chains.contains(&parent_root) { - request_state.register_failure_downloading(); - return Err(ParentVerifyError::PreviousFailure { parent_root }); - } - } - - Ok(root_and_verified) - } - pub fn add_peer(&mut self, peer: PeerId) { self.current_parent_request.add_peer(peer) } @@ -228,23 +183,6 @@ impl ParentLookup { } } -impl From for ParentVerifyError { - fn from(e: LookupVerifyError) -> Self { - use LookupVerifyError as E; - match e { - E::RootMismatch => ParentVerifyError::RootMismatch, - E::NoBlockReturned => ParentVerifyError::NoBlockReturned, - E::ExtraBlocksReturned => ParentVerifyError::ExtraBlocksReturned, - E::UnrequestedBlobId(blob_id) => ParentVerifyError::UnrequestedBlobId(blob_id), - E::InvalidInclusionProof => ParentVerifyError::InvalidInclusionProof, - E::UnrequestedHeader => ParentVerifyError::UnrequestedHeader, - E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned, - E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index), - E::NotEnoughBlobsReturned => ParentVerifyError::NotEnoughBlobsReturned, - } - } -} - impl From for RequestError { fn from(e: LookupRequestError) -> Self { use LookupRequestError as E; @@ -282,7 +220,7 @@ impl RequestError { } RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts", RequestError::NoPeers => "no_peers", - RequestError::BadState(_) => "bad_state", + RequestError::BadState(..) => "bad_state", } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 15d10c77c24..5bb663967d7 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -11,29 +11,16 @@ use beacon_chain::BeaconChainTypes; use itertools::Itertools; use lighthouse_network::PeerAction; use rand::seq::IteratorRandom; -use slog::{trace, Logger}; +use slog::{debug, Logger}; use std::collections::HashSet; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; use store::Hash256; use strum::IntoStaticStr; -use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; +use types::blob_sidecar::FixedBlobSidecarList; use types::EthSpec; -#[derive(Debug, PartialEq, Eq, IntoStaticStr)] -pub enum LookupVerifyError { - RootMismatch, - NoBlockReturned, - ExtraBlocksReturned, - UnrequestedBlobId(BlobIdentifier), - InvalidInclusionProof, - UnrequestedHeader, - ExtraBlobsReturned, - NotEnoughBlobsReturned, - InvalidIndex(u64), -} - #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupRequestError { /// Too many failed attempts @@ -88,6 +75,7 @@ impl SingleBlockLookup { /// the next parent. pub fn update_requested_parent_block(&mut self, block_root: Hash256) { self.block_request_state.requested_block_root = block_root; + self.blob_request_state.block_root = block_root; self.block_request_state.state.state = State::AwaitingDownload; self.blob_request_state.state.state = State::AwaitingDownload; self.child_components = Some(ChildComponents::empty(block_root)); @@ -108,7 +96,7 @@ impl SingleBlockLookup { /// downloading the block and/or blobs. pub fn request_block_and_blobs( &mut self, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { let block_already_downloaded = self.block_already_downloaded(); let blobs_already_downloaded = self.blobs_already_downloaded(); @@ -216,7 +204,7 @@ impl SingleBlockLookup { pub fn should_drop_lookup_on_disconnected_peer( &mut self, peer_id: &PeerId, - cx: &SyncNetworkContext, + cx: &mut SyncNetworkContext, log: &Logger, ) -> bool { let block_root = self.block_root(); @@ -233,7 +221,7 @@ impl SingleBlockLookup { if block_peer_disconnected || blob_peer_disconnected { if let Err(e) = self.request_block_and_blobs(cx) { - trace!(log, "Single lookup failed on peer disconnection"; "block_root" => ?block_root, "error" => ?e); + debug!(log, "Single lookup failed on peer disconnection"; "block_root" => ?block_root, "error" => ?e); return true; } } @@ -318,6 +306,7 @@ pub struct BlobRequestState { /// from both block/blobs downloaded in the network layer and any blocks/blobs that exist in /// the data availability checker. pub requested_ids: MissingBlobs, + pub block_root: Hash256, /// Where we store blobs until we receive the stream terminator. pub blob_download_queue: FixedBlobSidecarList, pub state: SingleLookupRequestState, @@ -328,6 +317,7 @@ impl BlobRequestState { pub fn new(block_root: Hash256, peer_source: &[PeerId], is_deneb: bool) -> Self { let default_ids = MissingBlobs::new_without_block(block_root, is_deneb); Self { + block_root, requested_ids: default_ids, blob_download_queue: <_>::default(), state: SingleLookupRequestState::new(peer_source), @@ -416,11 +406,6 @@ impl SingleLookupRequestState { } } - // TODO: Should not leak the enum state - pub fn get_state(&self) -> &State { - &self.state - } - pub fn is_current_req_counter(&self, req_counter: u32) -> bool { self.req_counter == req_counter } @@ -460,8 +445,16 @@ impl SingleLookupRequestState { self.state = State::AwaitingDownload; } - pub fn on_download_success(&mut self, peer_id: PeerId) { - self.state = State::Processing { peer_id }; + pub fn on_download_success(&mut self) -> Result<(), String> { + match &self.state { + State::Downloading { peer_id } => { + self.state = State::Processing { peer_id: *peer_id }; + Ok(()) + } + other => Err(format!( + "request bad state, expected downloading got {other}" + )), + } } /// Registers a failure in processing a block. @@ -669,19 +662,9 @@ mod tests { ); as RequestState>::build_request( &mut sl.block_request_state, - &spec, ) .unwrap(); sl.block_request_state.state.state = State::Downloading { peer_id }; - - as RequestState>::verify_response( - &mut sl.block_request_state, - block.canonical_root(), - peer_id, - Some(block.into()), - ) - .unwrap() - .unwrap(); } #[test] @@ -714,7 +697,6 @@ mod tests { for _ in 1..TestLookup2::MAX_ATTEMPTS { as RequestState>::build_request( &mut sl.block_request_state, - &spec, ) .unwrap(); sl.block_request_state.state.on_download_failure(); @@ -723,30 +705,20 @@ mod tests { // Now we receive the block and send it for processing as RequestState>::build_request( &mut sl.block_request_state, - &spec, ) .unwrap(); sl.block_request_state.state.state = State::Downloading { peer_id }; - as RequestState>::verify_response( - &mut sl.block_request_state, - block.canonical_root(), - peer_id, - Some(block.into()), - ) - .unwrap() - .unwrap(); - // One processing failure maxes the available attempts sl.block_request_state.state.on_processing_failure(); assert_eq!( as RequestState>::build_request( &mut sl.block_request_state, - &spec - ), - Err(LookupRequestError::TooManyAttempts { + ) + .unwrap_err(), + LookupRequestError::TooManyAttempts { cannot_process: false - }) + } ) } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 6d50fe63200..8e3b35ee5d3 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -15,9 +15,10 @@ use beacon_chain::test_utils::{ build_log, generate_rand_block_and_blobs, BeaconChainHarness, EphemeralHarnessType, NumBlobs, }; use beacon_processor::WorkEvent; -use lighthouse_network::rpc::RPCResponseErrorCode; +use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; use lighthouse_network::types::SyncState; use lighthouse_network::{NetworkGlobals, Request}; +use slog::info; use slot_clock::{ManualSlotClock, SlotClock, TestingSlotClock}; use store::MemoryStore; use tokio::sync::mpsc; @@ -67,6 +68,7 @@ struct TestRig { /// `rng` for generating test blocks and blobs. rng: XorShiftRng, fork_name: ForkName, + log: Logger, } const D: Duration = Duration::new(0, 0); @@ -124,6 +126,7 @@ impl TestRig { log.clone(), ), fork_name, + log, } } @@ -136,6 +139,10 @@ impl TestRig { } } + fn log(&self, msg: &str) { + info!(self.log, "TEST_RIG"; "msg" => msg); + } + fn after_deneb(&self) -> bool { matches!(self.fork_name, ForkName::Deneb | ForkName::Electra) } @@ -914,7 +921,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { // Trigger the request rig.trigger_unknown_parent_block(peer_id, block.into()); - // Fail downloading the block + rig.log("Fail downloading the block"); for i in 0..(parent_lookup::PARENT_FAIL_TOLERANCE - PROCESSING_FAILURES) { let id = rig.expect_block_parent_request(parent_root); // Blobs are only requested in the first iteration as this test only retries blocks @@ -925,7 +932,7 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { rig.parent_lookup_failed_unavailable(id, peer_id); } - // Now fail processing a block in the parent request + rig.log("Now fail processing a block in the parent request"); for i in 0..PROCESSING_FAILURES { let id = rig.expect_block_parent_request(parent_root); // Blobs are only requested in the first iteration as this test only retries blocks diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index a868a092d3d..23bd1010bfe 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -36,7 +36,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::common::LookupType; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlob, RangeRequestId, SyncNetworkContext}; +use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; @@ -320,42 +320,12 @@ impl SyncManager { fn inject_error(&mut self, peer_id: PeerId, request_id: RequestId, error: RPCError) { trace!(self.log, "Sync manager received a failed RPC"); match request_id { - RequestId::SingleBlock { id } => match id.lookup_type { - LookupType::Current => self - .block_lookups - .single_block_lookup_failed::>( - id, - &peer_id, - &self.network, - error, - ), - LookupType::Parent => self - .block_lookups - .parent_lookup_failed::>( - id, - &peer_id, - &self.network, - error, - ), - }, - RequestId::SingleBlob { id } => match id.lookup_type { - LookupType::Current => self - .block_lookups - .single_block_lookup_failed::>( - id, - &peer_id, - &self.network, - error, - ), - LookupType::Parent => self - .block_lookups - .parent_lookup_failed::>( - id, - &peer_id, - &self.network, - error, - ), - }, + RequestId::SingleBlock { id } => { + self.on_single_block_response(id, peer_id, RpcEvent::RPCError(error)) + } + RequestId::SingleBlob { id } => { + self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) + } RequestId::RangeBlockAndBlobs { id } => { if let Some(sender_id) = self.network.range_request_failed(id) { match sender_id { @@ -694,7 +664,7 @@ impl SyncManager { } ChainSegmentProcessId::ParentLookup(chain_hash) => self .block_lookups - .parent_chain_processed(chain_hash, result, &self.network), + .parent_chain_processed(chain_hash, result, &mut self.network), }, } } @@ -836,26 +806,14 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { id } => match id.lookup_type { - LookupType::Current => self - .block_lookups - .single_lookup_response::>( - id, - peer_id, - block, - seen_timestamp, - &self.network, - ), - LookupType::Parent => self - .block_lookups - .parent_lookup_response::>( - id, - peer_id, - block, - seen_timestamp, - &self.network, - ), - }, + RequestId::SingleBlock { id } => self.on_single_block_response( + id, + peer_id, + match block { + Some(block) => RpcEvent::Response(block, seen_timestamp), + None => RpcEvent::StreamTermination, + }, + ), RequestId::SingleBlob { .. } => { crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id ); } @@ -865,6 +823,56 @@ impl SyncManager { } } + fn on_single_block_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + block: RpcEvent>>, + ) { + if let Some(resp) = self.network.on_single_block_response(id, block) { + match resp { + Ok((block, seen_timestamp)) => match id.lookup_type { + LookupType::Current => self + .block_lookups + .single_lookup_response::>( + id, + peer_id, + block, + seen_timestamp, + &mut self.network, + ), + LookupType::Parent => self + .block_lookups + .parent_lookup_response::>( + id, + peer_id, + block, + seen_timestamp, + &mut self.network, + ), + }, + Err(error) => match id.lookup_type { + LookupType::Current => self + .block_lookups + .single_block_lookup_failed::>( + id, + &peer_id, + &mut self.network, + error, + ), + LookupType::Parent => self + .block_lookups + .parent_lookup_failed::>( + id, + &peer_id, + &mut self.network, + error, + ), + }, + } + } + } + fn rpc_blob_received( &mut self, request_id: RequestId, @@ -876,32 +884,71 @@ impl SyncManager { RequestId::SingleBlock { .. } => { crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); } - RequestId::SingleBlob { id } => match id.lookup_type { - LookupType::Current => self - .block_lookups - .single_lookup_response::>( - id, - peer_id, - blob, - seen_timestamp, - &self.network, - ), - LookupType::Parent => self - .block_lookups - .parent_lookup_response::>( - id, - peer_id, - blob, - seen_timestamp, - &self.network, - ), - }, + RequestId::SingleBlob { id } => self.on_single_blob_response( + id, + peer_id, + match blob { + Some(blob) => RpcEvent::Response(blob, seen_timestamp), + None => RpcEvent::StreamTermination, + }, + ), RequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, blob.into()) } } } + fn on_single_blob_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + blob: RpcEvent>>, + ) { + if let Some(resp) = self.network.on_single_blob_response(id, blob) { + match resp { + Ok((blobs, seen_timestamp)) => match id.lookup_type { + LookupType::Current => self + .block_lookups + .single_lookup_response::>( + id, + peer_id, + blobs, + seen_timestamp, + &mut self.network, + ), + LookupType::Parent => self + .block_lookups + .parent_lookup_response::>( + id, + peer_id, + blobs, + seen_timestamp, + &mut self.network, + ), + }, + + Err(error) => match id.lookup_type { + LookupType::Current => self + .block_lookups + .single_block_lookup_failed::>( + id, + &peer_id, + &mut self.network, + error, + ), + LookupType::Parent => self + .block_lookups + .parent_lookup_failed::>( + id, + &peer_id, + &mut self.network, + error, + ), + }, + } + } + } + /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. fn range_block_and_blobs_response( diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 96f8de46fb7..fc91270c1dc 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,6 +1,8 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. +use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; +pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest}; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; @@ -9,17 +11,23 @@ use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use crate::sync::manager::SingleLookupReqId; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; -use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; +use lighthouse_network::rpc::methods::BlobsByRangeRequest; +use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; +pub use requests::LookupVerifyError; use slog::{debug, trace, warn}; use std::collections::hash_map::Entry; use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc; +use types::blob_sidecar::FixedBlobSidecarList; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +mod requests; + pub struct BlocksAndBlobsByRangeResponse { pub sender_id: RangeRequestId, pub responses: Result>, String>, @@ -37,6 +45,41 @@ pub enum RangeRequestId { }, } +#[derive(Debug)] +pub enum RpcEvent { + StreamTermination, + Response(T, Duration), + RPCError(RPCError), +} + +pub type RpcProcessingResult = Option>; + +pub enum LookupFailure { + RpcError(RPCError), + LookupVerifyError(LookupVerifyError), +} + +impl std::fmt::Display for LookupFailure { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + LookupFailure::RpcError(e) => write!(f, "RPC Error: {:?}", e), + LookupFailure::LookupVerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), + } + } +} + +impl From for LookupFailure { + fn from(e: RPCError) -> Self { + LookupFailure::RpcError(e) + } +} + +impl From for LookupFailure { + fn from(e: LookupVerifyError) -> Self { + LookupFailure::LookupVerifyError(e) + } +} + /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { /// The network channel to relay messages to the Network service. @@ -45,6 +88,12 @@ pub struct SyncNetworkContext { /// A sequential ID for all RPC requests. request_id: Id, + /// A mapping of active BlocksByRoot requests, including both current slot and parent lookups. + blocks_by_root_requests: FnvHashMap, + + /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. + blobs_by_root_requests: FnvHashMap>, + /// BlocksByRange requests paired with BlobsByRange range_blocks_and_blobs_requests: FnvHashMap)>, @@ -91,6 +140,8 @@ impl SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start request_id: 1, + blocks_by_root_requests: <_>::default(), + blobs_by_root_requests: <_>::default(), range_blocks_and_blobs_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -245,62 +296,57 @@ impl SyncNetworkContext { } pub fn block_lookup_request( - &self, + &mut self, id: SingleLookupReqId, peer_id: PeerId, - request: BlocksByRootRequest, + request: BlocksByRootSingleRequest, ) -> Result<(), &'static str> { debug!( self.log, "Sending BlocksByRoot Request"; "method" => "BlocksByRoot", - "block_roots" => ?request.block_roots().to_vec(), + "block_root" => ?request.0, "peer" => %peer_id, "id" => ?id ); self.send_network_msg(NetworkMessage::SendRequest { peer_id, - request: Request::BlocksByRoot(request), + request: Request::BlocksByRoot(request.into_request(&self.chain.spec)), request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), })?; + + self.blocks_by_root_requests + .insert(id, ActiveBlocksByRootRequest::new(request)); + Ok(()) } pub fn blob_lookup_request( - &self, + &mut self, id: SingleLookupReqId, - blob_peer_id: PeerId, - blob_request: BlobsByRootRequest, + peer_id: PeerId, + request: BlobsByRootSingleBlockRequest, ) -> Result<(), &'static str> { - if let Some(block_root) = blob_request - .blob_ids - .as_slice() - .first() - .map(|id| id.block_root) - { - let indices = blob_request - .blob_ids - .as_slice() - .iter() - .map(|id| id.index) - .collect::>(); - debug!( - self.log, - "Sending BlobsByRoot Request"; - "method" => "BlobsByRoot", - "block_root" => ?block_root, - "blob_indices" => ?indices, - "peer" => %blob_peer_id, - "id" => ?id - ); + debug!( + self.log, + "Sending BlobsByRoot Request"; + "method" => "BlobsByRoot", + "block_root" => ?request.block_root, + "blob_indices" => ?request.indices, + "peer" => %peer_id, + "id" => ?id + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: Request::BlobsByRoot(request.clone().into_request(&self.chain.spec)), + request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), + })?; + + self.blobs_by_root_requests + .insert(id, ActiveBlobsByRootRequest::new(request)); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id: blob_peer_id, - request: Request::BlobsByRoot(blob_request), - request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), - })?; - } Ok(()) } @@ -329,7 +375,7 @@ impl SyncNetworkContext { /// Reports to the scoring algorithm the behaviour of a peer. pub fn report_peer(&self, peer_id: PeerId, action: PeerAction, msg: &'static str) { - debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action); + debug!(self.log, "Sync reporting peer"; "peer_id" => %peer_id, "action" => %action, "msg" => %msg); self.network_send .send(NetworkMessage::ReportPeer { peer_id, @@ -405,4 +451,86 @@ impl SyncNetworkContext { self.range_blocks_and_blobs_requests .insert(id, (sender_id, info)); } + + // Request handlers + + pub fn on_single_block_response( + &mut self, + request_id: SingleLookupReqId, + block: RpcEvent>>, + ) -> RpcProcessingResult>> { + let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { + return None; + }; + + Some(match block { + RpcEvent::Response(block, seen_timestamp) => { + match request.get_mut().add_response(block) { + Ok(block) => Ok((block, seen_timestamp)), + Err(e) => { + // The request must be dropped after receiving an error. + request.remove(); + Err(e.into()) + } + } + } + RpcEvent::StreamTermination => match request.remove().terminate() { + Ok(_) => return None, + Err(e) => Err(e.into()), + }, + RpcEvent::RPCError(e) => { + request.remove(); + Err(e.into()) + } + }) + } + + pub fn on_single_blob_response( + &mut self, + request_id: SingleLookupReqId, + blob: RpcEvent>>, + ) -> RpcProcessingResult> { + let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { + return None; + }; + + Some(match blob { + RpcEvent::Response(blob, _) => match request.get_mut().add_response(blob) { + Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) + .map(|blobs| (blobs, timestamp_now())) + .map_err(Into::into), + Ok(None) => return None, + Err(e) => { + request.remove(); + Err(e.into()) + } + }, + RpcEvent::StreamTermination => { + // Stream terminator + match request.remove().terminate() { + Some(blobs) => to_fixed_blob_sidecar_list(blobs) + .map(|blobs| (blobs, timestamp_now())) + .map_err(Into::into), + None => return None, + } + } + RpcEvent::RPCError(e) => { + request.remove(); + Err(e.into()) + } + }) + } +} + +fn to_fixed_blob_sidecar_list( + blobs: Vec>>, +) -> Result, LookupVerifyError> { + let mut fixed_list = FixedBlobSidecarList::default(); + for blob in blobs.into_iter() { + let index = blob.index as usize; + *fixed_list + .get_mut(index) + .ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob) + } + Ok(fixed_list) } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs new file mode 100644 index 00000000000..0522b7fa384 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -0,0 +1,149 @@ +use beacon_chain::get_block_root; +use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest}; +use std::sync::Arc; +use strum::IntoStaticStr; +use types::{ + blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, +}; + +#[derive(Debug, PartialEq, Eq, IntoStaticStr)] +pub enum LookupVerifyError { + NoResponseReturned, + TooManyResponses, + UnrequestedBlockRoot(Hash256), + UnrequestedBlobIndex(u64), + InvalidInclusionProof, + DuplicateData, +} + +pub struct ActiveBlocksByRootRequest { + request: BlocksByRootSingleRequest, + resolved: bool, +} + +impl ActiveBlocksByRootRequest { + pub fn new(request: BlocksByRootSingleRequest) -> Self { + Self { + request, + resolved: false, + } + } + + /// Append a response to the single chunk request. If the chunk is valid, the request is + /// resolved immediately. + /// The active request SHOULD be dropped after `add_response` returns an error + pub fn add_response( + &mut self, + block: Arc>, + ) -> Result>, LookupVerifyError> { + if self.resolved { + return Err(LookupVerifyError::TooManyResponses); + } + + let block_root = get_block_root(&block); + if self.request.0 != block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + } + + // Valid data, blocks by root expects a single response + self.resolved = true; + Ok(block) + } + + pub fn terminate(self) -> Result<(), LookupVerifyError> { + if self.resolved { + Ok(()) + } else { + Err(LookupVerifyError::NoResponseReturned) + } + } +} + +#[derive(Debug, Copy, Clone)] +pub struct BlocksByRootSingleRequest(pub Hash256); + +impl BlocksByRootSingleRequest { + pub fn into_request(self, spec: &ChainSpec) -> BlocksByRootRequest { + BlocksByRootRequest::new(vec![self.0], spec) + } +} + +#[derive(Debug, Clone)] +pub struct BlobsByRootSingleBlockRequest { + pub block_root: Hash256, + pub indices: Vec, +} + +impl BlobsByRootSingleBlockRequest { + pub fn into_request(self, spec: &ChainSpec) -> BlobsByRootRequest { + BlobsByRootRequest::new( + self.indices + .into_iter() + .map(|index| BlobIdentifier { + block_root: self.block_root, + index, + }) + .collect(), + spec, + ) + } +} + +pub struct ActiveBlobsByRootRequest { + request: BlobsByRootSingleBlockRequest, + blobs: Vec>>, + resolved: bool, +} + +impl ActiveBlobsByRootRequest { + pub fn new(request: BlobsByRootSingleBlockRequest) -> Self { + Self { + request, + blobs: vec![], + resolved: false, + } + } + + /// Appends a chunk to this multi-item request. If all expected chunks are received, this + /// method returns `Some`, resolving the request before the stream terminator. + /// The active request SHOULD be dropped after `add_response` returns an error + pub fn add_response( + &mut self, + blob: Arc>, + ) -> Result>>>, LookupVerifyError> { + if self.resolved { + return Err(LookupVerifyError::TooManyResponses); + } + + let block_root = blob.block_root(); + if self.request.block_root != block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + } + if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { + return Err(LookupVerifyError::InvalidInclusionProof); + } + if !self.request.indices.contains(&blob.index) { + return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index)); + } + if self.blobs.iter().any(|b| b.index == blob.index) { + return Err(LookupVerifyError::DuplicateData); + } + + self.blobs.push(blob); + if self.blobs.len() >= self.request.indices.len() { + // All expected chunks received, return result early + self.resolved = true; + Ok(Some(std::mem::take(&mut self.blobs))) + } else { + Ok(None) + } + } + + pub fn terminate(self) -> Option>>> { + if self.resolved { + None + } else { + Some(self.blobs) + } + } +}