From fee2ee9c08c15d98f5a96892b44f833977cd0f24 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 15 Apr 2024 22:47:08 +0900 Subject: [PATCH 1/6] Make SingleLookupRequestState fields private (#5563) * Make SingleLookupRequestState fields private * Fix tests --- .../network/src/sync/block_lookups/common.rs | 98 +++++------ .../network/src/sync/block_lookups/mod.rs | 74 ++++----- .../src/sync/block_lookups/parent_lookup.rs | 28 ++-- .../sync/block_lookups/single_block_lookup.rs | 157 +++++++++++++----- beacon_node/network/src/sync/manager.rs | 1 + 5 files changed, 200 insertions(+), 158 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index a46e7d09376..8f7881eea8a 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -12,7 +12,6 @@ use beacon_chain::data_availability_checker::ChildComponents; use beacon_chain::{get_block_root, BeaconChainTypes}; use lighthouse_network::rpc::methods::BlobsByRootRequest; use lighthouse_network::rpc::BlocksByRootRequest; -use rand::prelude::IteratorRandom; use std::ops::IndexMut; use std::sync::Arc; use std::time::Duration; @@ -104,7 +103,7 @@ pub trait RequestState { cx: &SyncNetworkContext, ) -> Result<(), LookupRequestError> { // Check if request is necessary. - if !matches!(self.get_state().state, State::AwaitingDownload) { + if !self.get_state().is_awaiting_download() { return Ok(()); } @@ -112,13 +111,12 @@ pub trait RequestState { let (peer_id, request) = self.build_request(&cx.chain.spec)?; // Update request state. - self.get_state_mut().state = State::Downloading { peer_id }; - self.get_state_mut().req_counter += 1; + let req_counter = self.get_state_mut().on_download_start(peer_id); // Make request let id = SingleLookupReqId { id, - req_counter: self.get_state().req_counter, + req_counter, lookup_type: L::lookup_type(), }; Self::make_request(id, peer_id, request, cx) @@ -130,8 +128,7 @@ pub trait RequestState { let request_state = self.get_state(); if request_state.failed_attempts() >= max_attempts { - let cannot_process = - request_state.failed_processing >= request_state.failed_downloading; + let cannot_process = request_state.more_failed_processing_attempts(); Err(LookupRequestError::TooManyAttempts { cannot_process }) } else { Ok(()) @@ -141,15 +138,9 @@ pub trait RequestState { /// Get the next peer to request. Draws from the set of peers we think should have both the /// block and blob first. If that fails, we draw from the set of peers that may have either. fn get_peer(&mut self) -> Result { - let request_state = self.get_state_mut(); - let peer_id = request_state - .available_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied() - .ok_or(LookupRequestError::NoPeers)?; - request_state.used_peers.insert(peer_id); - Ok(peer_id) + self.get_state_mut() + .use_rand_available_peer() + .ok_or(LookupRequestError::NoPeers) } /// Initialize `Self::RequestType`. @@ -169,29 +160,35 @@ pub trait RequestState { fn verify_response( &mut self, expected_block_root: Hash256, + peer_id: PeerId, response: Option, ) -> Result, LookupVerifyError> { - let request_state = self.get_state_mut(); - match request_state.state { - State::AwaitingDownload => { - request_state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - State::Downloading { peer_id } => { - self.verify_response_inner(expected_block_root, response, peer_id) + let result = match *self.get_state().get_state() { + State::AwaitingDownload => Err(LookupVerifyError::ExtraBlocksReturned), + State::Downloading { peer_id: _ } => { + // TODO: We requested a download from Downloading { peer_id }, but the network + // injects a response from a different peer_id. What should we do? The peer_id to + // track for scoring is the one that actually sent the response, not the state's + self.verify_response_inner(expected_block_root, response) } - State::Processing { peer_id: _ } => match response { - Some(_) => { - // We sent the block for processing and received an extra block. - request_state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - None => { - // This is simply the stream termination and we are already processing the - // block - Ok(None) - } + State::Processing { .. } | State::Processed { .. } => match response { + // We sent the block for processing and received an extra block. + Some(_) => Err(LookupVerifyError::ExtraBlocksReturned), + // This is simply the stream termination and we are already processing the block + None => Ok(None), }, + }; + + match result { + Ok(Some(response)) => { + self.get_state_mut().on_download_success(peer_id); + Ok(Some(response)) + } + Ok(None) => Ok(None), + Err(e) => { + self.get_state_mut().on_download_failure(); + Err(e) + } } } @@ -200,7 +197,6 @@ pub trait RequestState { &mut self, expected_block_root: Hash256, response: Option, - peer_id: PeerId, ) -> Result, LookupVerifyError>; /// A getter for the parent root of the response. Returns an `Option` because we won't know @@ -232,7 +228,7 @@ pub trait RequestState { /// Register a failure to process the block or blob. fn register_failure_downloading(&mut self) { - self.get_state_mut().register_failure_downloading() + self.get_state_mut().on_download_failure() } /* Utility methods */ @@ -274,7 +270,6 @@ impl RequestState for BlockRequestState &mut self, expected_block_root: Hash256, response: Option, - peer_id: PeerId, ) -> Result>>, LookupVerifyError> { match response { Some(block) => { @@ -285,18 +280,13 @@ impl RequestState for BlockRequestState // return an error and drop the block // NOTE: we take this is as a download failure to prevent counting the // attempt as a chain failure, but simply a peer failure. - self.state.register_failure_downloading(); Err(LookupVerifyError::RootMismatch) } else { // Return the block for processing. - self.state.state = State::Processing { peer_id }; Ok(Some(block)) } } - None => { - self.state.register_failure_downloading(); - Err(LookupVerifyError::NoBlockReturned) - } + None => Err(LookupVerifyError::NoBlockReturned), } } @@ -374,25 +364,20 @@ impl RequestState for BlobRequestState, - peer_id: PeerId, ) -> Result>, LookupVerifyError> { match blob { Some(blob) => { let received_id = blob.id(); if !self.requested_ids.contains(&received_id) { - Err(LookupVerifyError::UnrequestedBlobId(received_id)) - } else if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - Err(LookupVerifyError::InvalidInclusionProof) - } else if blob.block_root() != expected_block_root { - Err(LookupVerifyError::UnrequestedHeader) - } else { - Ok(()) + return Err(LookupVerifyError::UnrequestedBlobId(received_id)); + } + if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { + return Err(LookupVerifyError::InvalidInclusionProof); + } + if blob.block_root() != expected_block_root { + return Err(LookupVerifyError::UnrequestedHeader); } - .map_err(|e| { - self.state.register_failure_downloading(); - e - })?; // State should remain downloading until we receive the stream terminator. self.requested_ids.remove(&received_id); @@ -403,7 +388,6 @@ impl RequestState for BlobRequestState { - self.state.state = State::Processing { peer_id }; let blobs = std::mem::take(&mut self.blob_download_queue); Ok(Some(blobs)) } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 949e1762ac7..4e1d02d38f4 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -285,12 +285,16 @@ impl BlockLookups { let mut lookup = self.single_block_lookups.remove(&id.id)?; let request_state = R::request_state_mut(&mut lookup); - if id.req_counter != request_state.get_state().req_counter { + if request_state + .get_state() + .is_current_req_counter(id.req_counter) + { + Some(lookup) + } else { // We don't want to drop the lookup, just ignore the old response. self.single_block_lookups.insert(id.id, lookup); - return None; + None } - Some(lookup) } /// Checks whether a single block lookup is waiting for a parent lookup to complete. This is @@ -374,7 +378,7 @@ impl BlockLookups { let expected_block_root = lookup.block_root(); let request_state = R::request_state_mut(&mut lookup); - match request_state.verify_response(expected_block_root, response) { + match request_state.verify_response(expected_block_root, peer_id, response) { Ok(Some(verified_response)) => { self.handle_verified_response::( seen_timestamp, @@ -415,10 +419,6 @@ impl BlockLookups { let id = lookup.id; let block_root = lookup.block_root(); - R::request_state_mut(lookup) - .get_state_mut() - .component_downloaded = true; - let cached_child = lookup.add_response::(verified_response.clone()); match cached_child { CachedChild::Ok(block) => { @@ -447,10 +447,7 @@ impl BlockLookups { // initial request. if lookup.both_components_downloaded() { lookup.penalize_blob_peer(cx); - lookup - .blob_request_state - .state - .register_failure_downloading(); + lookup.blob_request_state.state.on_download_failure(); } lookup.request_block_and_blobs(cx)?; } @@ -493,13 +490,13 @@ impl BlockLookups { if R::request_state_mut(&mut parent_lookup.current_parent_request) .get_state() - .req_counter - != id.req_counter + .is_current_req_counter(id.req_counter) { + Some(parent_lookup) + } else { self.parent_lookups.push(parent_lookup); - return None; + None } - Some(parent_lookup) } /// Process a response received from a parent lookup request. @@ -559,7 +556,7 @@ impl BlockLookups { cx: &SyncNetworkContext, parent_lookup: &mut ParentLookup, ) -> Result<(), RequestError> { - match parent_lookup.verify_response::(response, &mut self.failed_chains) { + match parent_lookup.verify_response::(peer_id, response, &mut self.failed_chains) { Ok(Some(verified_response)) => { self.handle_verified_response::( seen_timestamp, @@ -640,7 +637,7 @@ impl BlockLookups { RequestError::ChainTooLong => { self.failed_chains.insert(parent_lookup.chain_hash()); // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers() { + for &peer_id in parent_lookup.all_used_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } @@ -653,7 +650,7 @@ impl BlockLookups { self.failed_chains.insert(parent_lookup.chain_hash()); } // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers() { + for &peer_id in parent_lookup.all_used_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } @@ -661,6 +658,9 @@ impl BlockLookups { // This happens if the peer disconnects while the block is being // processed. Drop the request without extra penalty } + RequestError::BadState(_) => { + // Should never happen + } } } @@ -668,9 +668,12 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { /* Check disconnection for single lookups */ - self.single_block_lookups.retain(|_, req| { + self.single_block_lookups.retain(|id, req| { let should_drop_lookup = req.should_drop_lookup_on_disconnected_peer(peer_id, cx, &self.log); + if should_drop_lookup { + debug!(self.log, "Dropping lookup after peer disconnected"; "id" => id, "block_root" => %req.block_root()); + } !should_drop_lookup }); @@ -844,15 +847,15 @@ impl BlockLookups { ) -> Result<(), LookupRequestError> { let request_state = R::request_state_mut(lookup); - request_state.get_state_mut().component_processed = true; + request_state + .get_state_mut() + .on_processing_success() + .map_err(LookupRequestError::BadState)?; if lookup.both_components_processed() { lookup.penalize_blob_peer(cx); // Try it again if possible. - lookup - .blob_request_state - .state - .register_failure_processing(); + lookup.blob_request_state.state.on_processing_failure(); lookup.request_block_and_blobs(cx)?; } Ok(()) @@ -900,14 +903,8 @@ impl BlockLookups { BlockError::AvailabilityCheck(e) => match e.category() { AvailabilityCheckErrorCategory::Internal => { warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); - lookup - .block_request_state - .state - .register_failure_downloading(); - lookup - .blob_request_state - .state - .register_failure_downloading(); + lookup.block_request_state.state.on_download_failure(); + lookup.blob_request_state.state.on_download_failure(); lookup.request_block_and_blobs(cx)? } AvailabilityCheckErrorCategory::Malicious => { @@ -926,10 +923,7 @@ impl BlockLookups { ); // Try it again if possible. - lookup - .block_request_state - .state - .register_failure_processing(); + lookup.block_request_state.state.on_processing_failure(); lookup.request_block_and_blobs(cx)? } } @@ -999,7 +993,7 @@ impl BlockLookups { .current_parent_request .blob_request_state .state - .register_failure_processing(); + .on_processing_failure(); match parent_lookup .current_parent_request .request_block_and_blobs(cx) @@ -1255,8 +1249,8 @@ impl BlockLookups { penalty, } => { self.failed_chains.insert(chain_hash); - for peer_source in request.all_peers() { - cx.report_peer(peer_source, penalty, "parent_chain_failure") + for peer_source in request.all_used_peers() { + cx.report_peer(*peer_source, penalty, "parent_chain_failure") } } BatchProcessResult::NonFaultyFailure => { diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index a3cdfd7b00b..55dd26b661e 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -7,7 +7,6 @@ use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::{ChildComponents, DataAvailabilityChecker}; use beacon_chain::BeaconChainTypes; -use itertools::Itertools; use std::collections::VecDeque; use std::sync::Arc; use store::Hash256; @@ -56,6 +55,7 @@ pub enum RequestError { cannot_process: bool, }, NoPeers, + BadState(String), } impl ParentLookup { @@ -175,11 +175,11 @@ impl ParentLookup { self.current_parent_request .block_request_state .state - .register_failure_processing(); + .on_processing_failure(); self.current_parent_request .blob_request_state .state - .register_failure_processing(); + .on_processing_failure(); if let Some(components) = self.current_parent_request.child_components.as_mut() { components.downloaded_block = None; components.downloaded_blobs = <_>::default(); @@ -190,12 +190,14 @@ impl ParentLookup { /// the processing result of the block. pub fn verify_response>( &mut self, + peer_id: PeerId, block: Option, failed_chains: &mut lru_cache::LRUTimeCache, ) -> Result, ParentVerifyError> { let expected_block_root = self.current_parent_request.block_root(); let request_state = R::request_state_mut(&mut self.current_parent_request); - let root_and_verified = request_state.verify_response(expected_block_root, block)?; + let root_and_verified = + request_state.verify_response(expected_block_root, peer_id, block)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should // be dropped and the peer downscored. @@ -221,20 +223,8 @@ impl ParentLookup { self.current_parent_request.add_peers(peers) } - pub fn used_peers(&self) -> impl Iterator + '_ { - self.current_parent_request - .block_request_state - .state - .used_peers - .iter() - .chain( - self.current_parent_request - .blob_request_state - .state - .used_peers - .iter(), - ) - .unique() + pub fn all_used_peers(&self) -> impl Iterator + '_ { + self.current_parent_request.all_used_peers() } } @@ -264,6 +254,7 @@ impl From for RequestError { } E::NoPeers => RequestError::NoPeers, E::SendFailed(msg) => RequestError::SendFailed(msg), + E::BadState(msg) => RequestError::BadState(msg), } } } @@ -291,6 +282,7 @@ impl RequestError { } RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts", RequestError::NoPeers => "no_peers", + RequestError::BadState(_) => "bad_state", } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index a312f6e970a..15d10c77c24 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -8,7 +8,9 @@ use beacon_chain::data_availability_checker::{ AvailabilityCheckError, DataAvailabilityChecker, MissingBlobs, }; use beacon_chain::BeaconChainTypes; +use itertools::Itertools; use lighthouse_network::PeerAction; +use rand::seq::IteratorRandom; use slog::{trace, Logger}; use std::collections::HashSet; use std::fmt::Debug; @@ -19,13 +21,6 @@ use strum::IntoStaticStr; use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::EthSpec; -#[derive(Debug, PartialEq, Eq)] -pub enum State { - AwaitingDownload, - Downloading { peer_id: PeerId }, - Processing { peer_id: PeerId }, -} - #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { RootMismatch, @@ -48,6 +43,7 @@ pub enum LookupRequestError { }, NoPeers, SendFailed(&'static str), + BadState(String), } pub struct SingleBlockLookup { @@ -94,18 +90,16 @@ impl SingleBlockLookup { self.block_request_state.requested_block_root = block_root; self.block_request_state.state.state = State::AwaitingDownload; self.blob_request_state.state.state = State::AwaitingDownload; - self.block_request_state.state.component_downloaded = false; - self.blob_request_state.state.component_downloaded = false; - self.block_request_state.state.component_processed = false; - self.blob_request_state.state.component_processed = false; self.child_components = Some(ChildComponents::empty(block_root)); } - /// Get all unique peers across block and blob requests. - pub fn all_peers(&self) -> HashSet { - let mut all_peers = self.block_request_state.state.used_peers.clone(); - all_peers.extend(self.blob_request_state.state.used_peers.clone()); - all_peers + /// Get all unique used peers across block and blob requests. + pub fn all_used_peers(&self) -> impl Iterator + '_ { + self.block_request_state + .state + .get_used_peers() + .chain(self.blob_request_state.state.get_used_peers()) + .unique() } /// Send the necessary requests for blocks and/or blobs. This will check whether we have @@ -206,14 +200,14 @@ impl SingleBlockLookup { /// Returns true if the block has already been downloaded. pub fn both_components_downloaded(&self) -> bool { - self.block_request_state.state.component_downloaded - && self.blob_request_state.state.component_downloaded + self.block_request_state.state.is_downloaded() + && self.blob_request_state.state.is_downloaded() } /// Returns true if the block has already been downloaded. pub fn both_components_processed(&self) -> bool { - self.block_request_state.state.component_processed - && self.blob_request_state.state.component_processed + self.block_request_state.state.is_processed() + && self.blob_request_state.state.is_processed() } /// Checks both the block and blob request states to see if the peer is disconnected. @@ -304,7 +298,7 @@ impl SingleBlockLookup { if let Some(cached_child) = self.child_components.as_mut() { cached_child.clear_blobs(); } - self.blob_request_state.state.register_failure_downloading() + self.blob_request_state.state.on_download_failure() } /// This failure occurs after processing, so register a failure processing, penalize the peer @@ -314,7 +308,7 @@ impl SingleBlockLookup { if let Some(cached_child) = self.child_components.as_mut() { cached_child.clear_blobs(); } - self.blob_request_state.state.register_failure_processing() + self.blob_request_state.state.on_processing_failure() } } @@ -375,29 +369,34 @@ pub enum CachedChild { /// There was an error during consistency checks between block and blobs. Err(AvailabilityCheckError), } + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + AwaitingDownload, + Downloading { peer_id: PeerId }, + Processing { peer_id: PeerId }, + Processed { peer_id: PeerId }, +} + /// Object representing the state of a single block or blob lookup request. #[derive(PartialEq, Eq, Debug)] pub struct SingleLookupRequestState { /// State of this request. - pub state: State, + state: State, /// Peers that should have this block or blob. - pub available_peers: HashSet, + available_peers: HashSet, /// Peers from which we have requested this block. - pub used_peers: HashSet, + used_peers: HashSet, /// How many times have we attempted to process this block or blob. - pub failed_processing: u8, + failed_processing: u8, /// How many times have we attempted to download this block or blob. - pub failed_downloading: u8, - /// Whether or not we have downloaded this block or blob. - pub component_downloaded: bool, - /// Whether or not we have processed this block or blob. - pub component_processed: bool, + failed_downloading: u8, /// Should be incremented everytime this request is retried. The purpose of this is to /// differentiate retries of the same block/blob request within a lookup. We currently penalize /// peers and retry requests prior to receiving the stream terminator. This means responses /// from a prior request may arrive after a new request has been sent, this counter allows /// us to differentiate these two responses. - pub req_counter: u32, + req_counter: u32, } impl SingleLookupRequestState { @@ -413,30 +412,83 @@ impl SingleLookupRequestState { used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, - component_downloaded: false, - component_processed: false, req_counter: 0, } } - /// Registers a failure in processing a block. - pub fn register_failure_processing(&mut self) { - self.failed_processing = self.failed_processing.saturating_add(1); - self.state = State::AwaitingDownload; + // TODO: Should not leak the enum state + pub fn get_state(&self) -> &State { + &self.state + } + + pub fn is_current_req_counter(&self, req_counter: u32) -> bool { + self.req_counter == req_counter + } + + pub fn is_awaiting_download(&self) -> bool { + matches!(self.state, State::AwaitingDownload) + } + + pub fn is_downloaded(&self) -> bool { + match self.state { + State::AwaitingDownload => false, + State::Downloading { .. } => false, + State::Processing { .. } => true, + State::Processed { .. } => true, + } + } + + pub fn is_processed(&self) -> bool { + match self.state { + State::AwaitingDownload => false, + State::Downloading { .. } => false, + State::Processing { .. } => false, + State::Processed { .. } => true, + } + } + + pub fn on_download_start(&mut self, peer_id: PeerId) -> u32 { + self.state = State::Downloading { peer_id }; + self.req_counter += 1; + self.req_counter } /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong /// block. - pub fn register_failure_downloading(&mut self) { + pub fn on_download_failure(&mut self) { self.failed_downloading = self.failed_downloading.saturating_add(1); self.state = State::AwaitingDownload; } + pub fn on_download_success(&mut self, peer_id: PeerId) { + self.state = State::Processing { peer_id }; + } + + /// Registers a failure in processing a block. + pub fn on_processing_failure(&mut self) { + self.failed_processing = self.failed_processing.saturating_add(1); + self.state = State::AwaitingDownload; + } + + pub fn on_processing_success(&mut self) -> Result<(), String> { + match &self.state { + State::Processing { peer_id } => { + self.state = State::Processed { peer_id: *peer_id }; + Ok(()) + } + other => Err(format!("not in processing state: {}", other).to_string()), + } + } + /// The total number of failures, whether it be processing or downloading. pub fn failed_attempts(&self) -> u8 { self.failed_processing + self.failed_downloading } + pub fn more_failed_processing_attempts(&self) -> bool { + self.failed_processing >= self.failed_downloading + } + /// This method should be used for peers wrapped in `PeerId::BlockAndBlobs`. pub fn add_peer(&mut self, peer_id: &PeerId) { self.available_peers.insert(*peer_id); @@ -448,7 +500,7 @@ impl SingleLookupRequestState { if let State::Downloading { peer_id } = &self.state { if peer_id == dc_peer_id { // Peer disconnected before providing a block - self.register_failure_downloading(); + self.on_download_failure(); return Err(()); } } @@ -459,10 +511,25 @@ impl SingleLookupRequestState { /// returns an error. pub fn processing_peer(&self) -> Result { match &self.state { - State::Processing { peer_id } => Ok(*peer_id), + State::Processing { peer_id } | State::Processed { peer_id } => Ok(*peer_id), other => Err(format!("not in processing state: {}", other).to_string()), } } + + pub fn get_used_peers(&self) -> impl Iterator { + self.used_peers.iter() + } + + /// Selects a random peer from available peers if any, inserts it in used peers and returns it. + pub fn use_rand_available_peer(&mut self) -> Option { + let peer_id = self + .available_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied()?; + self.used_peers.insert(peer_id); + Some(peer_id) + } } impl slog::Value for SingleBlockLookup { @@ -509,6 +576,7 @@ impl slog::Value for SingleLookupRequestState { State::Processing { peer_id } => { serializer.emit_arguments("processing_peer", &format_args!("{}", peer_id))? } + State::Processed { .. } => "processed".serialize(record, "state", serializer)?, } serializer.emit_u8("failed_downloads", self.failed_downloading)?; serializer.emit_u8("failed_processing", self.failed_processing)?; @@ -522,6 +590,7 @@ impl std::fmt::Display for State { State::AwaitingDownload => write!(f, "AwaitingDownload"), State::Downloading { .. } => write!(f, "Downloading"), State::Processing { .. } => write!(f, "Processing"), + State::Processed { .. } => write!(f, "Processed"), } } } @@ -608,6 +677,7 @@ mod tests { as RequestState>::verify_response( &mut sl.block_request_state, block.canonical_root(), + peer_id, Some(block.into()), ) .unwrap() @@ -647,7 +717,7 @@ mod tests { &spec, ) .unwrap(); - sl.block_request_state.state.register_failure_downloading(); + sl.block_request_state.state.on_download_failure(); } // Now we receive the block and send it for processing @@ -661,13 +731,14 @@ mod tests { as RequestState>::verify_response( &mut sl.block_request_state, block.canonical_root(), + peer_id, Some(block.into()), ) .unwrap() .unwrap(); // One processing failure maxes the available attempts - sl.block_request_state.state.register_failure_processing(); + sl.block_request_state.state.on_processing_failure(); assert_eq!( as RequestState>::build_request( &mut sl.block_request_state, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 15595ecde5c..a868a092d3d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -637,6 +637,7 @@ impl SyncManager { self.handle_unknown_block_root(peer_id, block_root); } SyncMessage::Disconnect(peer_id) => { + debug!(self.log, "Received disconnected message"; "peer_id" => %peer_id); self.peer_disconnect(&peer_id); } SyncMessage::RpcError { From f5e0404fb8c5ad924b5d0f0f4e953f7a3778e327 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 16 Apr 2024 06:20:50 +0900 Subject: [PATCH 2/6] Ensure proper ReqResp blocks_by_* response stream termination (#5582) * Ensure proper ReqResp blocks_by_* response stream termination * retrigger CI --- .../network_beacon_processor/rpc_methods.rs | 119 ++++++++++-------- 1 file changed, 66 insertions(+), 53 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 81c8f662ee9..fb813723920 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -134,13 +134,37 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, request: BlocksByRootRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.clone() + .handle_blocks_by_root_request_inner(executor, peer_id, request_id, request) + .await, + Response::BlocksByRoot, + ); + } + + /// Handle a `BlocksByRoot` request from the peer. + pub async fn handle_blocks_by_root_request_inner( + self: Arc, + executor: TaskExecutor, + peer_id: PeerId, + request_id: PeerRequestId, + request: BlocksByRootRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { let requested_blocks = request.block_roots().len(); let mut block_stream = match self .chain .get_blocks_checking_caches(request.block_roots().to_vec(), &executor) { Ok(block_stream) => block_stream, - Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), + Err(e) => { + error!(self.log, "Error getting block stream"; "error" => ?e); + return Err(( + RPCResponseErrorCode::ServerError, + "Error getting block stream", + )); + } }; // Fetching blocks is async because it may have to hit the execution layer for payloads. let mut send_block_count = 0; @@ -169,13 +193,10 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); - // send the stream terminator - return self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::ResourceUnavailable, - "Execution layer not synced".into(), - request_id, - ); + "Execution layer not synced", + )); } Err(e) => { debug!( @@ -196,8 +217,7 @@ impl NetworkBeaconProcessor { "returned" => %send_block_count ); - // send stream termination - self.send_response(peer_id, Response::BlocksByRoot(None), request_id); + Ok(()) } /// Handle a `BlobsByRoot` request from the peer. @@ -380,6 +400,24 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, req: BlocksByRangeRequest, ) { + self.terminate_response_stream( + peer_id, + request_id, + self.clone() + .handle_blocks_by_range_request_inner(executor, peer_id, request_id, req) + .await, + Response::BlocksByRange, + ); + } + + /// Handle a `BlocksByRange` request from the peer. + pub async fn handle_blocks_by_range_request_inner( + self: Arc, + executor: TaskExecutor, + peer_id: PeerId, + request_id: PeerRequestId, + req: BlocksByRangeRequest, + ) -> Result<(), (RPCResponseErrorCode, &'static str)> { debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, "count" => req.count(), @@ -401,12 +439,10 @@ impl NetworkBeaconProcessor { } }); if *req.count() > max_request_size { - return self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::InvalidRequest, - format!("Request exceeded max size {max_request_size}"), - request_id, - ); + "Request exceeded max size", + )); } let forwards_block_root_iter = match self @@ -424,25 +460,15 @@ impl NetworkBeaconProcessor { "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot ); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), - request_id, - ); + return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling")); } Err(e) => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Database error".into(), - request_id, - ); - return error!(self.log, "Unable to obtain root iter"; + error!(self.log, "Unable to obtain root iter"; "request" => ?req, "peer" => %peer_id, "error" => ?e ); + return Err((RPCResponseErrorCode::ServerError, "Database error")); } }; @@ -468,11 +494,12 @@ impl NetworkBeaconProcessor { let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, Err(e) => { - return error!(self.log, "Error during iteration over blocks"; + error!(self.log, "Error during iteration over blocks"; "request" => ?req, "peer" => %peer_id, "error" => ?e - ) + ); + return Err((RPCResponseErrorCode::ServerError, "Iteration error")); } }; @@ -481,7 +508,10 @@ impl NetworkBeaconProcessor { let mut block_stream = match self.chain.get_blocks(block_roots, &executor) { Ok(block_stream) => block_stream, - Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), + Err(e) => { + error!(self.log, "Error getting block stream"; "error" => ?e); + return Err((RPCResponseErrorCode::ServerError, "Iterator error")); + } }; // Fetching blocks is async because it may have to hit the execution layer for payloads. @@ -511,12 +541,7 @@ impl NetworkBeaconProcessor { "peer" => %peer_id, "request_root" => ?root ); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Database inconsistency".into(), - request_id, - ); + return Err((RPCResponseErrorCode::ServerError, "Database inconsistency")); } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { debug!( @@ -526,12 +551,10 @@ impl NetworkBeaconProcessor { "reason" => "execution layer not synced", ); // send the stream terminator - return self.send_error_response( - peer_id, + return Err(( RPCResponseErrorCode::ResourceUnavailable, - "Execution layer not synced".into(), - request_id, - ); + "Execution layer not synced", + )); } Err(e) => { if matches!( @@ -556,12 +579,7 @@ impl NetworkBeaconProcessor { } // send the stream terminator - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ServerError, - "Failed fetching blocks".into(), - request_id, - ); + return Err((RPCResponseErrorCode::ServerError, "Failed fetching blocks")); } } } @@ -594,12 +612,7 @@ impl NetworkBeaconProcessor { ); } - // send the stream terminator - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::BlocksByRange(None), - id: request_id, - }); + Ok(()) } /// Handle a `BlobsByRange` request from the peer. From e5b8d1237d9eb65d56c53fd827e049b1704bbc87 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 16 Apr 2024 21:59:53 +0900 Subject: [PATCH 3/6] Use Action in single_block_component_processed (#5564) * Use Action in single_block_component_processed * Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_block_component_processed-action * fix compile after merge * add continue action for when we are awaiting other parts of missing components --- .../network/src/sync/block_lookups/mod.rs | 242 +++++++++--------- 1 file changed, 126 insertions(+), 116 deletions(-) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 4e1d02d38f4..a5826bcb3d8 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -45,6 +45,13 @@ pub type DownloadedBlock = (Hash256, RpcBlock); const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; +enum Action { + Retry, + ParentUnknown { parent_root: Hash256, slot: Slot }, + Drop, + Continue, +} + pub struct BlockLookups { /// Parent chain lookups being downloaded. parent_lookups: SmallVec<[ParentLookup; 3]>, @@ -773,7 +780,7 @@ impl BlockLookups { return; }; - let root = lookup.block_root(); + let block_root = lookup.block_root(); let request_state = R::request_state_mut(&mut lookup); let peer_id = match request_state.get_state().processing_peer() { @@ -787,28 +794,49 @@ impl BlockLookups { self.log, "Block component processed for lookup"; "response_type" => ?R::response_type(), - "block_root" => ?root, + "block_root" => ?block_root, "result" => ?result, "id" => target_id, ); - match result { - BlockProcessingResult::Ok(status) => match status { - AvailabilityProcessingStatus::Imported(root) => { - trace!(self.log, "Single block processing succeeded"; "block" => %root); - } - AvailabilityProcessingStatus::MissingComponents(_, _block_root) => { - match self.handle_missing_components::(cx, &mut lookup) { - Ok(()) => { - self.single_block_lookups.insert(target_id, lookup); - } - Err(e) => { - // Drop with an additional error. - warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e); - } - } + let action = match result { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) + | BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => { + // Successfully imported + trace!(self.log, "Single block processing succeeded"; "block" => %block_root); + Action::Drop + } + + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + _, + _block_root, + )) => { + // `on_processing_success` is called here to ensure the request state is updated prior to checking + // if both components have been processed. + if R::request_state_mut(&mut lookup) + .get_state_mut() + .on_processing_success() + .is_err() + { + warn!( + self.log, + "Single block processing state incorrect"; + "action" => "dropping single block request" + ); + Action::Drop + // If this was the result of a block request, we can't determined if the block peer did anything + // wrong. If we already had both a block and blobs response processed, we should penalize the + // blobs peer because they did not provide all blobs on the initial request. + } else if lookup.both_components_processed() { + lookup.penalize_blob_peer(cx); + + // Try it again if possible. + lookup.blob_request_state.state.on_processing_failure(); + Action::Retry + } else { + Action::Continue } - }, + } BlockProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. @@ -817,118 +845,88 @@ impl BlockLookups { "Single block processing was ignored, cpu might be overloaded"; "action" => "dropping single block request" ); + Action::Drop } BlockProcessingResult::Err(e) => { - match self.handle_single_lookup_block_error(cx, lookup, peer_id, e) { - Ok(Some(lookup)) => { - self.single_block_lookups.insert(target_id, lookup); + let root = lookup.block_root(); + trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); + match e { + BlockError::BeaconChainError(e) => { + // Internal error + error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); + Action::Drop } - Ok(None) => { - // Drop without an additional error. + BlockError::ParentUnknown(block) => { + let slot = block.slot(); + let parent_root = block.parent_root(); + lookup.add_child_components(block.into()); + Action::ParentUnknown { parent_root, slot } } - Err(e) => { - // Drop with an additional error. - warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e); + ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { + // These errors indicate that the execution layer is offline + // and failed to validate the execution payload. Do not downscore peer. + debug!( + self.log, + "Single block lookup failed. Execution layer is offline / unsynced / misconfigured"; + "root" => %root, + "error" => ?e + ); + Action::Drop + } + BlockError::AvailabilityCheck(e) => match e.category() { + AvailabilityCheckErrorCategory::Internal => { + warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); + lookup.block_request_state.state.on_download_failure(); + lookup.blob_request_state.state.on_download_failure(); + Action::Retry + } + AvailabilityCheckErrorCategory::Malicious => { + warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); + lookup.handle_availability_check_failure(cx); + Action::Retry + } + }, + other => { + warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); + if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() { + cx.report_peer( + block_peer, + PeerAction::MidToleranceError, + "single_block_failure", + ); + + lookup.block_request_state.state.on_processing_failure(); + } + Action::Retry } } } }; - } - - /// Handles a `MissingComponents` block processing error. Handles peer scoring and retries. - /// - /// If this was the result of a block request, we can't determined if the block peer did anything - /// wrong. If we already had both a block and blobs response processed, we should penalize the - /// blobs peer because they did not provide all blobs on the initial request. - fn handle_missing_components>( - &self, - cx: &SyncNetworkContext, - lookup: &mut SingleBlockLookup, - ) -> Result<(), LookupRequestError> { - let request_state = R::request_state_mut(lookup); - - request_state - .get_state_mut() - .on_processing_success() - .map_err(LookupRequestError::BadState)?; - if lookup.both_components_processed() { - lookup.penalize_blob_peer(cx); - - // Try it again if possible. - lookup.blob_request_state.state.on_processing_failure(); - lookup.request_block_and_blobs(cx)?; - } - Ok(()) - } - /// Handles peer scoring and retries related to a `BlockError` in response to a single block - /// or blob lookup processing result. - fn handle_single_lookup_block_error( - &mut self, - cx: &mut SyncNetworkContext, - mut lookup: SingleBlockLookup, - peer_id: PeerId, - e: BlockError, - ) -> Result>, LookupRequestError> { - let root = lookup.block_root(); - trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e); - match e { - BlockError::BlockIsAlreadyKnown(_) => { - // No error here - return Ok(None); - } - BlockError::BeaconChainError(e) => { - // Internal error - error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e); - return Ok(None); + match action { + Action::Retry => { + if let Err(e) = lookup.request_block_and_blobs(cx) { + warn!(self.log, "Single block lookup failed"; "block_root" => %block_root, "error" => ?e); + // Failed with too many retries, drop with noop + self.update_metrics(); + } else { + self.single_block_lookups.insert(target_id, lookup); + } } - BlockError::ParentUnknown(block) => { - let slot = block.slot(); - let parent_root = block.parent_root(); - lookup.add_child_components(block.into()); - lookup.request_block_and_blobs(cx)?; - self.search_parent(slot, root, parent_root, peer_id, cx); + Action::ParentUnknown { parent_root, slot } => { + // TODO: Consider including all peers from the lookup, claiming to know this block, not + // just the one that sent this specific block + self.search_parent(slot, block_root, parent_root, peer_id, cx); + self.single_block_lookups.insert(target_id, lookup); } - ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { - // These errors indicate that the execution layer is offline - // and failed to validate the execution payload. Do not downscore peer. - debug!( - self.log, - "Single block lookup failed. Execution layer is offline / unsynced / misconfigured"; - "root" => %root, - "error" => ?e - ); - return Ok(None); + Action::Drop => { + // drop with noop + self.update_metrics(); } - BlockError::AvailabilityCheck(e) => match e.category() { - AvailabilityCheckErrorCategory::Internal => { - warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); - lookup.block_request_state.state.on_download_failure(); - lookup.blob_request_state.state.on_download_failure(); - lookup.request_block_and_blobs(cx)? - } - AvailabilityCheckErrorCategory::Malicious => { - warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); - lookup.handle_availability_check_failure(cx); - lookup.request_block_and_blobs(cx)? - } - }, - other => { - warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id); - if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() { - cx.report_peer( - block_peer, - PeerAction::MidToleranceError, - "single_block_failure", - ); - - // Try it again if possible. - lookup.block_request_state.state.on_processing_failure(); - lookup.request_block_and_blobs(cx)? - } + Action::Continue => { + self.single_block_lookups.insert(target_id, lookup); } } - Ok(Some(lookup)) } pub fn parent_block_processed( @@ -1369,4 +1367,16 @@ impl BlockLookups { pub fn drop_parent_chain_requests(&mut self) -> usize { self.parent_lookups.drain(..).len() } + + pub fn update_metrics(&self) { + metrics::set_gauge( + &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, + self.single_block_lookups.len() as i64, + ); + + metrics::set_gauge( + &metrics::SYNC_PARENT_BLOCK_LOOKUPS, + self.parent_lookups.len() as i64, + ); + } } From f68989815c9b66b33d98a593cb331c0c2b291b92 Mon Sep 17 00:00:00 2001 From: ethDreamer <37123614+ethDreamer@users.noreply.github.com> Date: Tue, 16 Apr 2024 09:56:00 -0500 Subject: [PATCH 4/6] Restore Log on Error & Spawn Blocking in Streamer (#5585) * Restore Logging in Error Cases * Use Spawn Blocking for Loading Blocks in Streamer * Merge remote-tracking branch 'upstream/unstable' into request_logging_spawn_blocking * Address Sean's Comments * save a clone --- .../beacon_chain/src/beacon_block_streamer.rs | 89 +++++++----- beacon_node/beacon_chain/src/beacon_chain.rs | 8 +- .../src/network_beacon_processor/mod.rs | 6 +- .../network_beacon_processor/rpc_methods.rs | 134 ++++++++++-------- 4 files changed, 131 insertions(+), 106 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index bbd5bfcac9a..4f413ce2a86 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -1,10 +1,9 @@ use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes}; use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1}; -use slog::{crit, debug, Logger}; +use slog::{crit, debug, error, Logger}; use std::collections::HashMap; use std::sync::Arc; use store::{DatabaseBlock, ExecutionPayloadDeneb}; -use task_executor::TaskExecutor; use tokio::sync::{ mpsc::{self, UnboundedSender}, RwLock, @@ -395,18 +394,18 @@ impl BeaconBlockStreamer { pub fn new( beacon_chain: &Arc>, check_caches: CheckCaches, - ) -> Result { + ) -> Result, BeaconChainError> { let execution_layer = beacon_chain .execution_layer .as_ref() .ok_or(BeaconChainError::ExecutionLayerMissing)? .clone(); - Ok(Self { + Ok(Arc::new(Self { execution_layer, check_caches, beacon_chain: beacon_chain.clone(), - }) + })) } fn check_caches(&self, root: Hash256) -> Option>> { @@ -425,30 +424,44 @@ impl BeaconBlockStreamer { } } - fn load_payloads(&self, block_roots: Vec) -> Vec<(Hash256, LoadResult)> { - let mut db_blocks = Vec::new(); - - for root in block_roots { - if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) { - db_blocks.push((root, Ok(Some(cached_block)))); - continue; - } - - match self.beacon_chain.store.try_get_full_block(&root) { - Err(e) => db_blocks.push((root, Err(e.into()))), - Ok(opt_block) => db_blocks.push(( - root, - Ok(opt_block.map(|db_block| match db_block { - DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)), - DatabaseBlock::Blinded(block) => { - LoadedBeaconBlock::Blinded(Box::new(block)) + async fn load_payloads( + self: &Arc, + block_roots: Vec, + ) -> Result)>, BeaconChainError> { + let streamer = self.clone(); + // Loading from the DB is slow -> spawn a blocking task + self.beacon_chain + .spawn_blocking_handle( + move || { + let mut db_blocks = Vec::new(); + for root in block_roots { + if let Some(cached_block) = + streamer.check_caches(root).map(LoadedBeaconBlock::Full) + { + db_blocks.push((root, Ok(Some(cached_block)))); + continue; } - })), - )), - } - } - db_blocks + match streamer.beacon_chain.store.try_get_full_block(&root) { + Err(e) => db_blocks.push((root, Err(e.into()))), + Ok(opt_block) => db_blocks.push(( + root, + Ok(opt_block.map(|db_block| match db_block { + DatabaseBlock::Full(block) => { + LoadedBeaconBlock::Full(Arc::new(block)) + } + DatabaseBlock::Blinded(block) => { + LoadedBeaconBlock::Blinded(Box::new(block)) + } + })), + )), + } + } + db_blocks + }, + "load_beacon_blocks", + ) + .await } /// Pre-process the loaded blocks into execution engine requests. @@ -549,7 +562,7 @@ impl BeaconBlockStreamer { // used when the execution engine doesn't support the payload bodies methods async fn stream_blocks_fallback( - &self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -575,7 +588,7 @@ impl BeaconBlockStreamer { } async fn stream_blocks( - &self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -584,7 +597,17 @@ impl BeaconBlockStreamer { let mut n_sent = 0usize; let mut engine_requests = 0usize; - let payloads = self.load_payloads(block_roots); + let payloads = match self.load_payloads(block_roots).await { + Ok(payloads) => payloads, + Err(e) => { + error!( + self.beacon_chain.log, + "BeaconBlockStreamer: Failed to load payloads"; + "error" => ?e + ); + return; + } + }; let requests = self.get_requests(payloads).await; for (root, request) in requests { @@ -624,7 +647,7 @@ impl BeaconBlockStreamer { } pub async fn stream( - self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -650,9 +673,8 @@ impl BeaconBlockStreamer { } pub fn launch_stream( - self, + self: Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> impl Stream>)> { let (block_tx, block_rx) = mpsc::unbounded_channel(); debug!( @@ -660,6 +682,7 @@ impl BeaconBlockStreamer { "Launching a BeaconBlockStreamer"; "blocks" => block_roots.len(), ); + let executor = self.beacon_chain.task_executor.clone(); executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender"); UnboundedReceiverStream::new(block_rx) } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7c497e74584..b3790024f81 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1139,7 +1139,6 @@ impl BeaconChain { pub fn get_blocks_checking_caches( self: &Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> Result< impl Stream< Item = ( @@ -1149,14 +1148,12 @@ impl BeaconChain { >, Error, > { - Ok(BeaconBlockStreamer::::new(self, CheckCaches::Yes)? - .launch_stream(block_roots, executor)) + Ok(BeaconBlockStreamer::::new(self, CheckCaches::Yes)?.launch_stream(block_roots)) } pub fn get_blocks( self: &Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> Result< impl Stream< Item = ( @@ -1166,8 +1163,7 @@ impl BeaconChain { >, Error, > { - Ok(BeaconBlockStreamer::::new(self, CheckCaches::No)? - .launch_stream(block_roots, executor)) + Ok(BeaconBlockStreamer::::new(self, CheckCaches::No)?.launch_stream(block_roots)) } pub fn get_blobs_checking_early_attester_cache( diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 27b9e676da6..f10646c7414 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -509,9 +509,8 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { - let executor = processor.executor.clone(); processor - .handle_blocks_by_range_request(executor, peer_id, request_id, request) + .handle_blocks_by_range_request(peer_id, request_id, request) .await; }; @@ -530,9 +529,8 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { - let executor = processor.executor.clone(); processor - .handle_blocks_by_root_request(executor, peer_id, request_id, request) + .handle_blocks_by_root_request(peer_id, request_id, request) .await; }; diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index fb813723920..1e72dc42578 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -11,7 +11,6 @@ use slog::{debug, error, warn}; use slot_clock::SlotClock; use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; -use task_executor::TaskExecutor; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; use types::{Epoch, EthSpec, ForkName, Hash256, Slot}; @@ -129,7 +128,6 @@ impl NetworkBeaconProcessor { /// Handle a `BlocksByRoot` request from the peer. pub async fn handle_blocks_by_root_request( self: Arc, - executor: TaskExecutor, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, @@ -138,7 +136,7 @@ impl NetworkBeaconProcessor { peer_id, request_id, self.clone() - .handle_blocks_by_root_request_inner(executor, peer_id, request_id, request) + .handle_blocks_by_root_request_inner(peer_id, request_id, request) .await, Response::BlocksByRoot, ); @@ -147,15 +145,24 @@ impl NetworkBeaconProcessor { /// Handle a `BlocksByRoot` request from the peer. pub async fn handle_blocks_by_root_request_inner( self: Arc, - executor: TaskExecutor, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, ) -> Result<(), (RPCResponseErrorCode, &'static str)> { + let log_results = |peer_id, requested_blocks, send_block_count| { + debug!( + self.log, + "BlocksByRoot outgoing response processed"; + "peer" => %peer_id, + "requested" => requested_blocks, + "returned" => %send_block_count + ); + }; + let requested_blocks = request.block_roots().len(); let mut block_stream = match self .chain - .get_blocks_checking_caches(request.block_roots().to_vec(), &executor) + .get_blocks_checking_caches(request.block_roots().to_vec()) { Ok(block_stream) => block_stream, Err(e) => { @@ -193,6 +200,7 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); + log_results(peer_id, requested_blocks, send_block_count); return Err(( RPCResponseErrorCode::ResourceUnavailable, "Execution layer not synced", @@ -209,13 +217,7 @@ impl NetworkBeaconProcessor { } } } - debug!( - self.log, - "Received BlocksByRoot Request"; - "peer" => %peer_id, - "requested" => requested_blocks, - "returned" => %send_block_count - ); + log_results(peer_id, requested_blocks, send_block_count); Ok(()) } @@ -302,7 +304,7 @@ impl NetworkBeaconProcessor { } debug!( self.log, - "Received BlobsByRoot Request"; + "BlobsByRoot outgoing response processed"; "peer" => %peer_id, "request_root" => %requested_root, "request_indices" => ?requested_indices, @@ -395,7 +397,6 @@ impl NetworkBeaconProcessor { /// Handle a `BlocksByRange` request from the peer. pub async fn handle_blocks_by_range_request( self: Arc, - executor: TaskExecutor, peer_id: PeerId, request_id: PeerRequestId, req: BlocksByRangeRequest, @@ -404,7 +405,7 @@ impl NetworkBeaconProcessor { peer_id, request_id, self.clone() - .handle_blocks_by_range_request_inner(executor, peer_id, request_id, req) + .handle_blocks_by_range_request_inner(peer_id, request_id, req) .await, Response::BlocksByRange, ); @@ -413,7 +414,6 @@ impl NetworkBeaconProcessor { /// Handle a `BlocksByRange` request from the peer. pub async fn handle_blocks_by_range_request_inner( self: Arc, - executor: TaskExecutor, peer_id: PeerId, request_id: PeerRequestId, req: BlocksByRangeRequest, @@ -506,7 +506,37 @@ impl NetworkBeaconProcessor { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); - let mut block_stream = match self.chain.get_blocks(block_roots, &executor) { + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| { + if blocks_sent < (*req.count() as usize) { + debug!( + self.log, + "BlocksByRange outgoing response processed"; + "peer" => %peer_id, + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot(), + "current_slot" => current_slot, + "requested" => req.count(), + "returned" => blocks_sent + ); + } else { + debug!( + self.log, + "BlocksByRange outgoing response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot(), + "current_slot" => current_slot, + "requested" => req.count(), + "returned" => blocks_sent + ); + } + }; + + let mut block_stream = match self.chain.get_blocks(block_roots) { Ok(block_stream) => block_stream, Err(e) => { error!(self.log, "Error getting block stream"; "error" => ?e); @@ -516,7 +546,6 @@ impl NetworkBeaconProcessor { // Fetching blocks is async because it may have to hit the execution layer for payloads. let mut blocks_sent = 0; - while let Some((root, result)) = block_stream.next().await { match result.as_ref() { Ok(Some(block)) => { @@ -541,6 +570,7 @@ impl NetworkBeaconProcessor { "peer" => %peer_id, "request_root" => ?root ); + log_results(req, peer_id, blocks_sent); return Err((RPCResponseErrorCode::ServerError, "Database inconsistency")); } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { @@ -550,6 +580,7 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); + log_results(req, peer_id, blocks_sent); // send the stream terminator return Err(( RPCResponseErrorCode::ResourceUnavailable, @@ -577,41 +608,14 @@ impl NetworkBeaconProcessor { "error" => ?e ); } - + log_results(req, peer_id, blocks_sent); // send the stream terminator return Err((RPCResponseErrorCode::ServerError, "Failed fetching blocks")); } } } - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - - if blocks_sent < (*req.count() as usize) { - debug!( - self.log, - "BlocksByRange outgoing response processed"; - "peer" => %peer_id, - "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot(), - "current_slot" => current_slot, - "requested" => req.count(), - "returned" => blocks_sent - ); - } else { - debug!( - self.log, - "BlocksByRange outgoing response processed"; - "peer" => %peer_id, - "start_slot" => req.start_slot(), - "current_slot" => current_slot, - "requested" => req.count(), - "returned" => blocks_sent - ); - } - + log_results(req, peer_id, blocks_sent); Ok(()) } @@ -754,9 +758,25 @@ impl NetworkBeaconProcessor { } }; + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |peer_id, req: BlobsByRangeRequest, blobs_sent| { + debug!( + self.log, + "BlobsByRange outgoing response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent + ); + }; + // remove all skip slots let block_roots = block_roots.into_iter().flatten(); - let mut blobs_sent = 0; for root in block_roots { @@ -780,6 +800,8 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "error" => ?e ); + log_results(peer_id, req, blobs_sent); + return Err(( RPCResponseErrorCode::ServerError, "No blobs and failed fetching corresponding block", @@ -787,21 +809,7 @@ impl NetworkBeaconProcessor { } } } - - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - - debug!( - self.log, - "BlobsByRange Response processed"; - "peer" => %peer_id, - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blobs_sent - ); + log_results(peer_id, req, blobs_sent); Ok(()) } From cda926ce1b335055024bd705374f7a4e27f01dda Mon Sep 17 00:00:00 2001 From: ethDreamer <37123614+ethDreamer@users.noreply.github.com> Date: Tue, 16 Apr 2024 14:03:08 -0500 Subject: [PATCH 5/6] Rename Functions to More Closely Match Spec (#5591) * Rename Functions to More Closely Match Spec --- beacon_node/genesis/src/eth1_genesis_service.rs | 4 ++-- .../state_processing/src/common/initiate_validator_exit.rs | 2 +- consensus/state_processing/src/genesis.rs | 4 ++-- .../src/per_block_processing/process_operations.rs | 4 ++-- .../src/per_epoch_processing/single_pass.rs | 2 +- consensus/types/src/beacon_state.rs | 6 +++--- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/beacon_node/genesis/src/eth1_genesis_service.rs b/beacon_node/genesis/src/eth1_genesis_service.rs index fdba9f4741c..0ede74ba754 100644 --- a/beacon_node/genesis/src/eth1_genesis_service.rs +++ b/beacon_node/genesis/src/eth1_genesis_service.rs @@ -5,7 +5,7 @@ use eth1::{DepositLog, Eth1Block, Service as Eth1Service}; use slog::{debug, error, info, trace, Logger}; use state_processing::{ eth2_genesis_time, initialize_beacon_state_from_eth1, is_valid_genesis_state, - per_block_processing::process_operations::process_deposit, process_activations, + per_block_processing::process_operations::apply_deposit, process_activations, }; use std::sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, @@ -433,7 +433,7 @@ impl Eth1GenesisService { // is reached _prior_ to `MIN_ACTIVE_VALIDATOR_COUNT`. I suspect this won't be the // case for mainnet, so we defer this optimization. - process_deposit(&mut state, &deposit, spec, PROOF_VERIFICATION) + apply_deposit(&mut state, &deposit, spec, PROOF_VERIFICATION) .map_err(|e| format!("Error whilst processing deposit: {:?}", e)) })?; diff --git a/consensus/state_processing/src/common/initiate_validator_exit.rs b/consensus/state_processing/src/common/initiate_validator_exit.rs index c527807df89..84656d9c890 100644 --- a/consensus/state_processing/src/common/initiate_validator_exit.rs +++ b/consensus/state_processing/src/common/initiate_validator_exit.rs @@ -26,7 +26,7 @@ pub fn initiate_validator_exit( .map_or(delayed_epoch, |epoch| max(epoch, delayed_epoch)); let exit_queue_churn = state.exit_cache().get_churn_at(exit_queue_epoch)?; - if exit_queue_churn >= state.get_churn_limit(spec)? { + if exit_queue_churn >= state.get_validator_churn_limit(spec)? { exit_queue_epoch.safe_add_assign(1)?; } diff --git a/consensus/state_processing/src/genesis.rs b/consensus/state_processing/src/genesis.rs index b225923b418..036ab23498c 100644 --- a/consensus/state_processing/src/genesis.rs +++ b/consensus/state_processing/src/genesis.rs @@ -1,5 +1,5 @@ use super::per_block_processing::{ - errors::BlockProcessingError, process_operations::process_deposit, + errors::BlockProcessingError, process_operations::apply_deposit, }; use crate::common::DepositDataTree; use crate::upgrade::{ @@ -37,7 +37,7 @@ pub fn initialize_beacon_state_from_eth1( .push_leaf(deposit.data.tree_hash_root()) .map_err(BlockProcessingError::MerkleTreeError)?; state.eth1_data_mut().deposit_root = deposit_tree.root(); - process_deposit(&mut state, deposit, spec, true)?; + apply_deposit(&mut state, deposit, spec, true)?; } process_activations(&mut state, spec)?; diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index af9b7938132..63b7c9e01fb 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -371,14 +371,14 @@ pub fn process_deposits( // Update the state in series. for deposit in deposits { - process_deposit(state, deposit, spec, false)?; + apply_deposit(state, deposit, spec, false)?; } Ok(()) } /// Process a single deposit, optionally verifying its merkle proof. -pub fn process_deposit( +pub fn apply_deposit( state: &mut BeaconState, deposit: &Deposit, spec: &ChainSpec, diff --git a/consensus/state_processing/src/per_epoch_processing/single_pass.rs b/consensus/state_processing/src/per_epoch_processing/single_pass.rs index 9319d2941b5..380484046c3 100644 --- a/consensus/state_processing/src/per_epoch_processing/single_pass.rs +++ b/consensus/state_processing/src/per_epoch_processing/single_pass.rs @@ -120,7 +120,7 @@ pub fn process_epoch_single_pass( let next_epoch = state.next_epoch()?; let is_in_inactivity_leak = state.is_in_inactivity_leak(previous_epoch, spec)?; let total_active_balance = state.get_total_active_balance()?; - let churn_limit = state.get_churn_limit(spec)?; + let churn_limit = state.get_validator_churn_limit(spec)?; let activation_churn_limit = state.get_activation_churn_limit(spec)?; let finalized_checkpoint = state.finalized_checkpoint(); let fork_name = state.fork_name_unchecked(); diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index ba11c9c4cce..02572b0efbd 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1444,7 +1444,7 @@ impl BeaconState { /// Return the churn limit for the current epoch (number of validators who can leave per epoch). /// /// Uses the current epoch committee cache, and will error if it isn't initialized. - pub fn get_churn_limit(&self, spec: &ChainSpec) -> Result { + pub fn get_validator_churn_limit(&self, spec: &ChainSpec) -> Result { Ok(std::cmp::max( spec.min_per_epoch_churn_limit, (self @@ -1462,10 +1462,10 @@ impl BeaconState { BeaconState::Base(_) | BeaconState::Altair(_) | BeaconState::Merge(_) - | BeaconState::Capella(_) => self.get_churn_limit(spec)?, + | BeaconState::Capella(_) => self.get_validator_churn_limit(spec)?, BeaconState::Deneb(_) | BeaconState::Electra(_) => std::cmp::min( spec.max_per_epoch_activation_churn_limit, - self.get_churn_limit(spec)?, + self.get_validator_churn_limit(spec)?, ), }) } From 49617f3e8280a947e8ad2a3809e6b71da2165461 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 18 Apr 2024 01:09:09 +1000 Subject: [PATCH 6/6] Set web3signer keep-alive to 20s by default (#5587) * Set web3signer keep-alive to 20s by default * add tests --- book/src/help_vc.md | 2 +- lighthouse/tests/validator_client.rs | 26 +++++++++++++++++++++++++- validator_client/src/cli.rs | 2 +- validator_client/src/config.rs | 3 ++- validator_client/src/lib.rs | 2 +- 5 files changed, 30 insertions(+), 5 deletions(-) diff --git a/book/src/help_vc.md b/book/src/help_vc.md index fb963f87cc5..1b7e7f2b0af 100644 --- a/book/src/help_vc.md +++ b/book/src/help_vc.md @@ -218,7 +218,7 @@ OPTIONS: The directory which contains the validator keystores, deposit data for each validator along with the common slashing protection database and the validator_definitions.yml --web3-signer-keep-alive-timeout - Keep-alive timeout for each web3signer connection. Set to 'null' to never timeout [default: 90000] + Keep-alive timeout for each web3signer connection. Set to 'null' to never timeout [default: 20000] --web3-signer-max-idle-connections Maximum number of idle connections to maintain per web3signer host. Default is unlimited. diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index 764fd87ccdf..cdf8fa15aaa 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -1,4 +1,4 @@ -use validator_client::{ApiTopic, Config}; +use validator_client::{config::DEFAULT_WEB3SIGNER_KEEP_ALIVE, ApiTopic, Config}; use crate::exec::CommandLineTestExec; use bls::{Keypair, PublicKeyBytes}; @@ -9,6 +9,7 @@ use std::path::PathBuf; use std::process::Command; use std::str::FromStr; use std::string::ToString; +use std::time::Duration; use tempfile::TempDir; use types::Address; @@ -653,3 +654,26 @@ fn validator_disable_web3_signer_slashing_protection() { assert!(!config.enable_web3signer_slashing_protection); }); } + +#[test] +fn validator_web3_signer_keep_alive_default() { + CommandLineTest::new().run().with_config(|config| { + assert_eq!( + config.web3_signer_keep_alive_timeout, + DEFAULT_WEB3SIGNER_KEEP_ALIVE + ); + }); +} + +#[test] +fn validator_web3_signer_keep_alive_override() { + CommandLineTest::new() + .flag("web3-signer-keep-alive-timeout", Some("1000")) + .run() + .with_config(|config| { + assert_eq!( + config.web3_signer_keep_alive_timeout, + Some(Duration::from_secs(1)) + ); + }); +} diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 16a265212e5..f91efbdfbc5 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -391,7 +391,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { Arg::with_name("web3-signer-keep-alive-timeout") .long("web3-signer-keep-alive-timeout") .value_name("MILLIS") - .default_value("90000") + .default_value("20000") .help("Keep-alive timeout for each web3signer connection. Set to 'null' to never \ timeout") .takes_value(true), diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index ae59829a3e6..5bd32fced2a 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -18,6 +18,7 @@ use std::time::Duration; use types::{Address, GRAFFITI_BYTES_LEN}; pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/"; +pub const DEFAULT_WEB3SIGNER_KEEP_ALIVE: Option = Some(Duration::from_secs(20)); /// Stores the core configuration for this validator instance. #[derive(Clone, Serialize, Deserialize)] @@ -133,7 +134,7 @@ impl Default for Config { builder_boost_factor: None, prefer_builder_proposals: false, distributed: false, - web3_signer_keep_alive_timeout: Some(Duration::from_secs(90)), + web3_signer_keep_alive_timeout: DEFAULT_WEB3SIGNER_KEEP_ALIVE, web3_signer_max_idle_connections: None, } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 32a0eadbef4..268c25cdf7d 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -3,7 +3,6 @@ mod beacon_node_fallback; mod block_service; mod check_synced; mod cli; -mod config; mod duties_service; mod graffiti_file; mod http_metrics; @@ -14,6 +13,7 @@ mod preparation_service; mod signing_method; mod sync_committee_service; +pub mod config; mod doppelganger_service; pub mod http_api; pub mod initialized_validators;