diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index a46e7d09376..8f7881eea8a 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -12,7 +12,6 @@ use beacon_chain::data_availability_checker::ChildComponents; use beacon_chain::{get_block_root, BeaconChainTypes}; use lighthouse_network::rpc::methods::BlobsByRootRequest; use lighthouse_network::rpc::BlocksByRootRequest; -use rand::prelude::IteratorRandom; use std::ops::IndexMut; use std::sync::Arc; use std::time::Duration; @@ -104,7 +103,7 @@ pub trait RequestState { cx: &SyncNetworkContext, ) -> Result<(), LookupRequestError> { // Check if request is necessary. - if !matches!(self.get_state().state, State::AwaitingDownload) { + if !self.get_state().is_awaiting_download() { return Ok(()); } @@ -112,13 +111,12 @@ pub trait RequestState { let (peer_id, request) = self.build_request(&cx.chain.spec)?; // Update request state. - self.get_state_mut().state = State::Downloading { peer_id }; - self.get_state_mut().req_counter += 1; + let req_counter = self.get_state_mut().on_download_start(peer_id); // Make request let id = SingleLookupReqId { id, - req_counter: self.get_state().req_counter, + req_counter, lookup_type: L::lookup_type(), }; Self::make_request(id, peer_id, request, cx) @@ -130,8 +128,7 @@ pub trait RequestState { let request_state = self.get_state(); if request_state.failed_attempts() >= max_attempts { - let cannot_process = - request_state.failed_processing >= request_state.failed_downloading; + let cannot_process = request_state.more_failed_processing_attempts(); Err(LookupRequestError::TooManyAttempts { cannot_process }) } else { Ok(()) @@ -141,15 +138,9 @@ pub trait RequestState { /// Get the next peer to request. Draws from the set of peers we think should have both the /// block and blob first. If that fails, we draw from the set of peers that may have either. fn get_peer(&mut self) -> Result { - let request_state = self.get_state_mut(); - let peer_id = request_state - .available_peers - .iter() - .choose(&mut rand::thread_rng()) - .copied() - .ok_or(LookupRequestError::NoPeers)?; - request_state.used_peers.insert(peer_id); - Ok(peer_id) + self.get_state_mut() + .use_rand_available_peer() + .ok_or(LookupRequestError::NoPeers) } /// Initialize `Self::RequestType`. @@ -169,29 +160,35 @@ pub trait RequestState { fn verify_response( &mut self, expected_block_root: Hash256, + peer_id: PeerId, response: Option, ) -> Result, LookupVerifyError> { - let request_state = self.get_state_mut(); - match request_state.state { - State::AwaitingDownload => { - request_state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - State::Downloading { peer_id } => { - self.verify_response_inner(expected_block_root, response, peer_id) + let result = match *self.get_state().get_state() { + State::AwaitingDownload => Err(LookupVerifyError::ExtraBlocksReturned), + State::Downloading { peer_id: _ } => { + // TODO: We requested a download from Downloading { peer_id }, but the network + // injects a response from a different peer_id. What should we do? The peer_id to + // track for scoring is the one that actually sent the response, not the state's + self.verify_response_inner(expected_block_root, response) } - State::Processing { peer_id: _ } => match response { - Some(_) => { - // We sent the block for processing and received an extra block. - request_state.register_failure_downloading(); - Err(LookupVerifyError::ExtraBlocksReturned) - } - None => { - // This is simply the stream termination and we are already processing the - // block - Ok(None) - } + State::Processing { .. } | State::Processed { .. } => match response { + // We sent the block for processing and received an extra block. + Some(_) => Err(LookupVerifyError::ExtraBlocksReturned), + // This is simply the stream termination and we are already processing the block + None => Ok(None), }, + }; + + match result { + Ok(Some(response)) => { + self.get_state_mut().on_download_success(peer_id); + Ok(Some(response)) + } + Ok(None) => Ok(None), + Err(e) => { + self.get_state_mut().on_download_failure(); + Err(e) + } } } @@ -200,7 +197,6 @@ pub trait RequestState { &mut self, expected_block_root: Hash256, response: Option, - peer_id: PeerId, ) -> Result, LookupVerifyError>; /// A getter for the parent root of the response. Returns an `Option` because we won't know @@ -232,7 +228,7 @@ pub trait RequestState { /// Register a failure to process the block or blob. fn register_failure_downloading(&mut self) { - self.get_state_mut().register_failure_downloading() + self.get_state_mut().on_download_failure() } /* Utility methods */ @@ -274,7 +270,6 @@ impl RequestState for BlockRequestState &mut self, expected_block_root: Hash256, response: Option, - peer_id: PeerId, ) -> Result>>, LookupVerifyError> { match response { Some(block) => { @@ -285,18 +280,13 @@ impl RequestState for BlockRequestState // return an error and drop the block // NOTE: we take this is as a download failure to prevent counting the // attempt as a chain failure, but simply a peer failure. - self.state.register_failure_downloading(); Err(LookupVerifyError::RootMismatch) } else { // Return the block for processing. - self.state.state = State::Processing { peer_id }; Ok(Some(block)) } } - None => { - self.state.register_failure_downloading(); - Err(LookupVerifyError::NoBlockReturned) - } + None => Err(LookupVerifyError::NoBlockReturned), } } @@ -374,25 +364,20 @@ impl RequestState for BlobRequestState, - peer_id: PeerId, ) -> Result>, LookupVerifyError> { match blob { Some(blob) => { let received_id = blob.id(); if !self.requested_ids.contains(&received_id) { - Err(LookupVerifyError::UnrequestedBlobId(received_id)) - } else if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { - Err(LookupVerifyError::InvalidInclusionProof) - } else if blob.block_root() != expected_block_root { - Err(LookupVerifyError::UnrequestedHeader) - } else { - Ok(()) + return Err(LookupVerifyError::UnrequestedBlobId(received_id)); + } + if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) { + return Err(LookupVerifyError::InvalidInclusionProof); + } + if blob.block_root() != expected_block_root { + return Err(LookupVerifyError::UnrequestedHeader); } - .map_err(|e| { - self.state.register_failure_downloading(); - e - })?; // State should remain downloading until we receive the stream terminator. self.requested_ids.remove(&received_id); @@ -403,7 +388,6 @@ impl RequestState for BlobRequestState { - self.state.state = State::Processing { peer_id }; let blobs = std::mem::take(&mut self.blob_download_queue); Ok(Some(blobs)) } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 949e1762ac7..4e1d02d38f4 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -285,12 +285,16 @@ impl BlockLookups { let mut lookup = self.single_block_lookups.remove(&id.id)?; let request_state = R::request_state_mut(&mut lookup); - if id.req_counter != request_state.get_state().req_counter { + if request_state + .get_state() + .is_current_req_counter(id.req_counter) + { + Some(lookup) + } else { // We don't want to drop the lookup, just ignore the old response. self.single_block_lookups.insert(id.id, lookup); - return None; + None } - Some(lookup) } /// Checks whether a single block lookup is waiting for a parent lookup to complete. This is @@ -374,7 +378,7 @@ impl BlockLookups { let expected_block_root = lookup.block_root(); let request_state = R::request_state_mut(&mut lookup); - match request_state.verify_response(expected_block_root, response) { + match request_state.verify_response(expected_block_root, peer_id, response) { Ok(Some(verified_response)) => { self.handle_verified_response::( seen_timestamp, @@ -415,10 +419,6 @@ impl BlockLookups { let id = lookup.id; let block_root = lookup.block_root(); - R::request_state_mut(lookup) - .get_state_mut() - .component_downloaded = true; - let cached_child = lookup.add_response::(verified_response.clone()); match cached_child { CachedChild::Ok(block) => { @@ -447,10 +447,7 @@ impl BlockLookups { // initial request. if lookup.both_components_downloaded() { lookup.penalize_blob_peer(cx); - lookup - .blob_request_state - .state - .register_failure_downloading(); + lookup.blob_request_state.state.on_download_failure(); } lookup.request_block_and_blobs(cx)?; } @@ -493,13 +490,13 @@ impl BlockLookups { if R::request_state_mut(&mut parent_lookup.current_parent_request) .get_state() - .req_counter - != id.req_counter + .is_current_req_counter(id.req_counter) { + Some(parent_lookup) + } else { self.parent_lookups.push(parent_lookup); - return None; + None } - Some(parent_lookup) } /// Process a response received from a parent lookup request. @@ -559,7 +556,7 @@ impl BlockLookups { cx: &SyncNetworkContext, parent_lookup: &mut ParentLookup, ) -> Result<(), RequestError> { - match parent_lookup.verify_response::(response, &mut self.failed_chains) { + match parent_lookup.verify_response::(peer_id, response, &mut self.failed_chains) { Ok(Some(verified_response)) => { self.handle_verified_response::( seen_timestamp, @@ -640,7 +637,7 @@ impl BlockLookups { RequestError::ChainTooLong => { self.failed_chains.insert(parent_lookup.chain_hash()); // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers() { + for &peer_id in parent_lookup.all_used_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } @@ -653,7 +650,7 @@ impl BlockLookups { self.failed_chains.insert(parent_lookup.chain_hash()); } // This indicates faulty peers. - for &peer_id in parent_lookup.used_peers() { + for &peer_id in parent_lookup.all_used_peers() { cx.report_peer(peer_id, PeerAction::LowToleranceError, e.as_static()) } } @@ -661,6 +658,9 @@ impl BlockLookups { // This happens if the peer disconnects while the block is being // processed. Drop the request without extra penalty } + RequestError::BadState(_) => { + // Should never happen + } } } @@ -668,9 +668,12 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId, cx: &mut SyncNetworkContext) { /* Check disconnection for single lookups */ - self.single_block_lookups.retain(|_, req| { + self.single_block_lookups.retain(|id, req| { let should_drop_lookup = req.should_drop_lookup_on_disconnected_peer(peer_id, cx, &self.log); + if should_drop_lookup { + debug!(self.log, "Dropping lookup after peer disconnected"; "id" => id, "block_root" => %req.block_root()); + } !should_drop_lookup }); @@ -844,15 +847,15 @@ impl BlockLookups { ) -> Result<(), LookupRequestError> { let request_state = R::request_state_mut(lookup); - request_state.get_state_mut().component_processed = true; + request_state + .get_state_mut() + .on_processing_success() + .map_err(LookupRequestError::BadState)?; if lookup.both_components_processed() { lookup.penalize_blob_peer(cx); // Try it again if possible. - lookup - .blob_request_state - .state - .register_failure_processing(); + lookup.blob_request_state.state.on_processing_failure(); lookup.request_block_and_blobs(cx)?; } Ok(()) @@ -900,14 +903,8 @@ impl BlockLookups { BlockError::AvailabilityCheck(e) => match e.category() { AvailabilityCheckErrorCategory::Internal => { warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e); - lookup - .block_request_state - .state - .register_failure_downloading(); - lookup - .blob_request_state - .state - .register_failure_downloading(); + lookup.block_request_state.state.on_download_failure(); + lookup.blob_request_state.state.on_download_failure(); lookup.request_block_and_blobs(cx)? } AvailabilityCheckErrorCategory::Malicious => { @@ -926,10 +923,7 @@ impl BlockLookups { ); // Try it again if possible. - lookup - .block_request_state - .state - .register_failure_processing(); + lookup.block_request_state.state.on_processing_failure(); lookup.request_block_and_blobs(cx)? } } @@ -999,7 +993,7 @@ impl BlockLookups { .current_parent_request .blob_request_state .state - .register_failure_processing(); + .on_processing_failure(); match parent_lookup .current_parent_request .request_block_and_blobs(cx) @@ -1255,8 +1249,8 @@ impl BlockLookups { penalty, } => { self.failed_chains.insert(chain_hash); - for peer_source in request.all_peers() { - cx.report_peer(peer_source, penalty, "parent_chain_failure") + for peer_source in request.all_used_peers() { + cx.report_peer(*peer_source, penalty, "parent_chain_failure") } } BatchProcessResult::NonFaultyFailure => { diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index a3cdfd7b00b..55dd26b661e 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -7,7 +7,6 @@ use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::data_availability_checker::{ChildComponents, DataAvailabilityChecker}; use beacon_chain::BeaconChainTypes; -use itertools::Itertools; use std::collections::VecDeque; use std::sync::Arc; use store::Hash256; @@ -56,6 +55,7 @@ pub enum RequestError { cannot_process: bool, }, NoPeers, + BadState(String), } impl ParentLookup { @@ -175,11 +175,11 @@ impl ParentLookup { self.current_parent_request .block_request_state .state - .register_failure_processing(); + .on_processing_failure(); self.current_parent_request .blob_request_state .state - .register_failure_processing(); + .on_processing_failure(); if let Some(components) = self.current_parent_request.child_components.as_mut() { components.downloaded_block = None; components.downloaded_blobs = <_>::default(); @@ -190,12 +190,14 @@ impl ParentLookup { /// the processing result of the block. pub fn verify_response>( &mut self, + peer_id: PeerId, block: Option, failed_chains: &mut lru_cache::LRUTimeCache, ) -> Result, ParentVerifyError> { let expected_block_root = self.current_parent_request.block_root(); let request_state = R::request_state_mut(&mut self.current_parent_request); - let root_and_verified = request_state.verify_response(expected_block_root, block)?; + let root_and_verified = + request_state.verify_response(expected_block_root, peer_id, block)?; // check if the parent of this block isn't in the failed cache. If it is, this chain should // be dropped and the peer downscored. @@ -221,20 +223,8 @@ impl ParentLookup { self.current_parent_request.add_peers(peers) } - pub fn used_peers(&self) -> impl Iterator + '_ { - self.current_parent_request - .block_request_state - .state - .used_peers - .iter() - .chain( - self.current_parent_request - .blob_request_state - .state - .used_peers - .iter(), - ) - .unique() + pub fn all_used_peers(&self) -> impl Iterator + '_ { + self.current_parent_request.all_used_peers() } } @@ -264,6 +254,7 @@ impl From for RequestError { } E::NoPeers => RequestError::NoPeers, E::SendFailed(msg) => RequestError::SendFailed(msg), + E::BadState(msg) => RequestError::BadState(msg), } } } @@ -291,6 +282,7 @@ impl RequestError { } RequestError::TooManyAttempts { cannot_process: _ } => "too_many_downloading_attempts", RequestError::NoPeers => "no_peers", + RequestError::BadState(_) => "bad_state", } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index a312f6e970a..15d10c77c24 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -8,7 +8,9 @@ use beacon_chain::data_availability_checker::{ AvailabilityCheckError, DataAvailabilityChecker, MissingBlobs, }; use beacon_chain::BeaconChainTypes; +use itertools::Itertools; use lighthouse_network::PeerAction; +use rand::seq::IteratorRandom; use slog::{trace, Logger}; use std::collections::HashSet; use std::fmt::Debug; @@ -19,13 +21,6 @@ use strum::IntoStaticStr; use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::EthSpec; -#[derive(Debug, PartialEq, Eq)] -pub enum State { - AwaitingDownload, - Downloading { peer_id: PeerId }, - Processing { peer_id: PeerId }, -} - #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { RootMismatch, @@ -48,6 +43,7 @@ pub enum LookupRequestError { }, NoPeers, SendFailed(&'static str), + BadState(String), } pub struct SingleBlockLookup { @@ -94,18 +90,16 @@ impl SingleBlockLookup { self.block_request_state.requested_block_root = block_root; self.block_request_state.state.state = State::AwaitingDownload; self.blob_request_state.state.state = State::AwaitingDownload; - self.block_request_state.state.component_downloaded = false; - self.blob_request_state.state.component_downloaded = false; - self.block_request_state.state.component_processed = false; - self.blob_request_state.state.component_processed = false; self.child_components = Some(ChildComponents::empty(block_root)); } - /// Get all unique peers across block and blob requests. - pub fn all_peers(&self) -> HashSet { - let mut all_peers = self.block_request_state.state.used_peers.clone(); - all_peers.extend(self.blob_request_state.state.used_peers.clone()); - all_peers + /// Get all unique used peers across block and blob requests. + pub fn all_used_peers(&self) -> impl Iterator + '_ { + self.block_request_state + .state + .get_used_peers() + .chain(self.blob_request_state.state.get_used_peers()) + .unique() } /// Send the necessary requests for blocks and/or blobs. This will check whether we have @@ -206,14 +200,14 @@ impl SingleBlockLookup { /// Returns true if the block has already been downloaded. pub fn both_components_downloaded(&self) -> bool { - self.block_request_state.state.component_downloaded - && self.blob_request_state.state.component_downloaded + self.block_request_state.state.is_downloaded() + && self.blob_request_state.state.is_downloaded() } /// Returns true if the block has already been downloaded. pub fn both_components_processed(&self) -> bool { - self.block_request_state.state.component_processed - && self.blob_request_state.state.component_processed + self.block_request_state.state.is_processed() + && self.blob_request_state.state.is_processed() } /// Checks both the block and blob request states to see if the peer is disconnected. @@ -304,7 +298,7 @@ impl SingleBlockLookup { if let Some(cached_child) = self.child_components.as_mut() { cached_child.clear_blobs(); } - self.blob_request_state.state.register_failure_downloading() + self.blob_request_state.state.on_download_failure() } /// This failure occurs after processing, so register a failure processing, penalize the peer @@ -314,7 +308,7 @@ impl SingleBlockLookup { if let Some(cached_child) = self.child_components.as_mut() { cached_child.clear_blobs(); } - self.blob_request_state.state.register_failure_processing() + self.blob_request_state.state.on_processing_failure() } } @@ -375,29 +369,34 @@ pub enum CachedChild { /// There was an error during consistency checks between block and blobs. Err(AvailabilityCheckError), } + +#[derive(Debug, PartialEq, Eq)] +pub enum State { + AwaitingDownload, + Downloading { peer_id: PeerId }, + Processing { peer_id: PeerId }, + Processed { peer_id: PeerId }, +} + /// Object representing the state of a single block or blob lookup request. #[derive(PartialEq, Eq, Debug)] pub struct SingleLookupRequestState { /// State of this request. - pub state: State, + state: State, /// Peers that should have this block or blob. - pub available_peers: HashSet, + available_peers: HashSet, /// Peers from which we have requested this block. - pub used_peers: HashSet, + used_peers: HashSet, /// How many times have we attempted to process this block or blob. - pub failed_processing: u8, + failed_processing: u8, /// How many times have we attempted to download this block or blob. - pub failed_downloading: u8, - /// Whether or not we have downloaded this block or blob. - pub component_downloaded: bool, - /// Whether or not we have processed this block or blob. - pub component_processed: bool, + failed_downloading: u8, /// Should be incremented everytime this request is retried. The purpose of this is to /// differentiate retries of the same block/blob request within a lookup. We currently penalize /// peers and retry requests prior to receiving the stream terminator. This means responses /// from a prior request may arrive after a new request has been sent, this counter allows /// us to differentiate these two responses. - pub req_counter: u32, + req_counter: u32, } impl SingleLookupRequestState { @@ -413,30 +412,83 @@ impl SingleLookupRequestState { used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, - component_downloaded: false, - component_processed: false, req_counter: 0, } } - /// Registers a failure in processing a block. - pub fn register_failure_processing(&mut self) { - self.failed_processing = self.failed_processing.saturating_add(1); - self.state = State::AwaitingDownload; + // TODO: Should not leak the enum state + pub fn get_state(&self) -> &State { + &self.state + } + + pub fn is_current_req_counter(&self, req_counter: u32) -> bool { + self.req_counter == req_counter + } + + pub fn is_awaiting_download(&self) -> bool { + matches!(self.state, State::AwaitingDownload) + } + + pub fn is_downloaded(&self) -> bool { + match self.state { + State::AwaitingDownload => false, + State::Downloading { .. } => false, + State::Processing { .. } => true, + State::Processed { .. } => true, + } + } + + pub fn is_processed(&self) -> bool { + match self.state { + State::AwaitingDownload => false, + State::Downloading { .. } => false, + State::Processing { .. } => false, + State::Processed { .. } => true, + } + } + + pub fn on_download_start(&mut self, peer_id: PeerId) -> u32 { + self.state = State::Downloading { peer_id }; + self.req_counter += 1; + self.req_counter } /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong /// block. - pub fn register_failure_downloading(&mut self) { + pub fn on_download_failure(&mut self) { self.failed_downloading = self.failed_downloading.saturating_add(1); self.state = State::AwaitingDownload; } + pub fn on_download_success(&mut self, peer_id: PeerId) { + self.state = State::Processing { peer_id }; + } + + /// Registers a failure in processing a block. + pub fn on_processing_failure(&mut self) { + self.failed_processing = self.failed_processing.saturating_add(1); + self.state = State::AwaitingDownload; + } + + pub fn on_processing_success(&mut self) -> Result<(), String> { + match &self.state { + State::Processing { peer_id } => { + self.state = State::Processed { peer_id: *peer_id }; + Ok(()) + } + other => Err(format!("not in processing state: {}", other).to_string()), + } + } + /// The total number of failures, whether it be processing or downloading. pub fn failed_attempts(&self) -> u8 { self.failed_processing + self.failed_downloading } + pub fn more_failed_processing_attempts(&self) -> bool { + self.failed_processing >= self.failed_downloading + } + /// This method should be used for peers wrapped in `PeerId::BlockAndBlobs`. pub fn add_peer(&mut self, peer_id: &PeerId) { self.available_peers.insert(*peer_id); @@ -448,7 +500,7 @@ impl SingleLookupRequestState { if let State::Downloading { peer_id } = &self.state { if peer_id == dc_peer_id { // Peer disconnected before providing a block - self.register_failure_downloading(); + self.on_download_failure(); return Err(()); } } @@ -459,10 +511,25 @@ impl SingleLookupRequestState { /// returns an error. pub fn processing_peer(&self) -> Result { match &self.state { - State::Processing { peer_id } => Ok(*peer_id), + State::Processing { peer_id } | State::Processed { peer_id } => Ok(*peer_id), other => Err(format!("not in processing state: {}", other).to_string()), } } + + pub fn get_used_peers(&self) -> impl Iterator { + self.used_peers.iter() + } + + /// Selects a random peer from available peers if any, inserts it in used peers and returns it. + pub fn use_rand_available_peer(&mut self) -> Option { + let peer_id = self + .available_peers + .iter() + .choose(&mut rand::thread_rng()) + .copied()?; + self.used_peers.insert(peer_id); + Some(peer_id) + } } impl slog::Value for SingleBlockLookup { @@ -509,6 +576,7 @@ impl slog::Value for SingleLookupRequestState { State::Processing { peer_id } => { serializer.emit_arguments("processing_peer", &format_args!("{}", peer_id))? } + State::Processed { .. } => "processed".serialize(record, "state", serializer)?, } serializer.emit_u8("failed_downloads", self.failed_downloading)?; serializer.emit_u8("failed_processing", self.failed_processing)?; @@ -522,6 +590,7 @@ impl std::fmt::Display for State { State::AwaitingDownload => write!(f, "AwaitingDownload"), State::Downloading { .. } => write!(f, "Downloading"), State::Processing { .. } => write!(f, "Processing"), + State::Processed { .. } => write!(f, "Processed"), } } } @@ -608,6 +677,7 @@ mod tests { as RequestState>::verify_response( &mut sl.block_request_state, block.canonical_root(), + peer_id, Some(block.into()), ) .unwrap() @@ -647,7 +717,7 @@ mod tests { &spec, ) .unwrap(); - sl.block_request_state.state.register_failure_downloading(); + sl.block_request_state.state.on_download_failure(); } // Now we receive the block and send it for processing @@ -661,13 +731,14 @@ mod tests { as RequestState>::verify_response( &mut sl.block_request_state, block.canonical_root(), + peer_id, Some(block.into()), ) .unwrap() .unwrap(); // One processing failure maxes the available attempts - sl.block_request_state.state.register_failure_processing(); + sl.block_request_state.state.on_processing_failure(); assert_eq!( as RequestState>::build_request( &mut sl.block_request_state, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 15595ecde5c..a868a092d3d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -637,6 +637,7 @@ impl SyncManager { self.handle_unknown_block_root(peer_id, block_root); } SyncMessage::Disconnect(peer_id) => { + debug!(self.log, "Received disconnected message"; "peer_id" => %peer_id); self.peer_disconnect(&peer_id); } SyncMessage::RpcError {