From bad87b1357e6b0436ac8765ed5049e81fc751a59 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 26 Apr 2024 10:32:23 +0900 Subject: [PATCH] Merge current and parent lookups tests --- beacon_node/beacon_chain/src/beacon_chain.rs | 3 + .../network/src/sync/block_lookups/common.rs | 8 +- .../network/src/sync/block_lookups/mod.rs | 252 ++++++------ .../src/sync/block_lookups/parent_chain.rs | 197 +++++++++ .../sync/block_lookups/single_block_lookup.rs | 43 +- .../network/src/sync/block_lookups/tests.rs | 385 +++++++++++++----- beacon_node/network/src/sync/manager.rs | 38 +- .../network/src/sync/network_context.rs | 2 +- common/lru_cache/src/time.rs | 6 + .../src/test_utils/test_random/secret_key.rs | 2 + .../src/test_utils/test_random/signature.rs | 11 +- 11 files changed, 682 insertions(+), 265 deletions(-) create mode 100644 beacon_node/network/src/sync/block_lookups/parent_chain.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b3790024f81..069ed18cf90 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2918,6 +2918,9 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { + // TODO: Should also check for: + // - Parent block is known + // - Slot is not in the future return Err(BlockError::BlockIsAlreadyKnown(block_root)); } diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 1b56bbd0c5f..d7d3c9a061f 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -49,6 +49,7 @@ pub trait RequestState { id: Id, awaiting_parent: bool, downloaded_block_expected_blobs: Option, + block_is_processed: bool, cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { // Attempt to progress awaiting downloads @@ -75,7 +76,12 @@ pub trait RequestState { // Otherwise, attempt to progress awaiting processing // If this request is awaiting a parent lookup to be processed, do not send for processing. // The request will be rejected with unknown parent error. - } else if !awaiting_parent { + } else if !awaiting_parent && + // TODO: Blob processing / import does not check for unknown parent. As a temporary fix + // and to emulate the behaviour before this PR, hold blobs for processing until the + // block has been processed i.e. it has a known parent. + (block_is_processed || matches!(Self::response_type(), ResponseType::Block)) + { // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is // useful to conditionally access the result data. if let Some(result) = self.get_state_mut().maybe_start_processing() { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 094bebf0c4f..99724837034 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -1,8 +1,10 @@ +use self::parent_chain::{compute_parent_chains, NodeChain}; use self::single_block_lookup::{DownloadResult, LookupRequestError, SingleBlockLookup}; use super::manager::{BlockProcessType, BlockProcessingResult}; use super::network_context::{RpcProcessingResult, SyncNetworkContext}; use crate::metrics; -use crate::sync::block_lookups::common::PARENT_DEPTH_TOLERANCE; +use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; +use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use crate::sync::manager::Id; use crate::sync::network_context::LookupFailure; use beacon_chain::block_verification_types::AsBlock; @@ -14,19 +16,19 @@ use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState}; use slog::{debug, error, trace, warn, Logger}; -use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use store::Hash256; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; pub mod common; +pub mod parent_chain; mod single_block_lookup; #[cfg(test)] mod tests; const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; -pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3; +pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4; pub enum BlockComponent { Block(DownloadResult>>), @@ -55,6 +57,7 @@ pub struct BlockLookups { /// A cache of failed chain lookups to prevent duplicate searches. failed_chains: LRUTimeCache, + // TODO: Why not index lookups by block_root? single_block_lookups: FnvHashMap>, /// The logger for the import manager. @@ -72,6 +75,16 @@ impl BlockLookups { } } + #[cfg(test)] + pub(crate) fn insert_failed_chain(&mut self, block_root: Hash256) { + self.failed_chains.insert(block_root); + } + + #[cfg(test)] + pub(crate) fn get_failed_chains(&mut self) -> Vec { + self.failed_chains.keys().cloned().collect() + } + #[cfg(test)] pub(crate) fn active_single_lookups(&self) -> Vec<(Id, Hash256, Option)> { self.single_block_lookups @@ -81,122 +94,104 @@ impl BlockLookups { } /// Returns a vec of all parent lookup chains by tip, in descending slot order (tip first) - pub(crate) fn active_parent_lookups(&self) -> Vec> { - let mut child_to_parent = HashMap::new(); - let mut parent_to_child = HashMap::>::new(); - for lookup in self.single_block_lookups.values() { - let block_root = lookup.block_root(); - let parent_root = lookup.awaiting_parent(); - child_to_parent.insert(block_root, parent_root); - if let Some(parent_root) = parent_root { - parent_to_child - .entry(parent_root) - .or_default() - .push(block_root); - } - } - - let mut parent_chains = vec![]; - - // Iterate blocks which no child - for lookup in self.single_block_lookups.values() { - let mut block_root = lookup.block_root(); - if parent_to_child.get(&block_root).is_none() { - let mut chain = vec![]; - - // Resolve chain of blocks - loop { - if let Some(parent_root) = child_to_parent.get(&block_root) { - // block_root is a known block that may or may not have a parent root - chain.push(block_root); - if let Some(parent_root) = parent_root { - block_root = *parent_root; - continue; - } - } - break; - } - - if chain.len() > 1 { - parent_chains.push(chain); - } - } - } - - parent_chains - } - - #[cfg(test)] - pub(crate) fn failed_chains_contains(&mut self, chain_hash: &Hash256) -> bool { - self.failed_chains.contains(chain_hash) + pub(crate) fn active_parent_lookups(&self) -> Vec { + compute_parent_chains( + &self + .single_block_lookups + .values() + .map(|lookup| lookup.into()) + .collect::>(), + ) } /* Lookup requests */ /// Creates a lookup for the block with the given `block_root` and immediately triggers it. - pub fn search_child_of_parent( + /// Returns true if the lookup is created or already exists + pub fn search_child_and_parent( &mut self, block_root: Hash256, block_component: BlockComponent, - peer_source: PeerId, + peer_id: PeerId, cx: &mut SyncNetworkContext, ) { - let awaiting_parent = block_component.parent_root(); - self.new_current_lookup( - block_root, - Some(block_component), - Some(awaiting_parent), - &[peer_source], - cx, - ) + let parent_root = block_component.parent_root(); + + let parent_lookup_exists = + self.search_parent_of_child(parent_root, block_root, &[peer_id], cx); + // Only create the child lookup if the parent exists + if parent_lookup_exists { + // `search_parent_of_child` ensures that parent root is not a failed chain + self.new_current_lookup( + block_root, + Some(block_component), + Some(parent_root), + &[peer_id], + cx, + ); + } } /// Seach a block that we don't known its parent root. + /// Returns true if the lookup is created or already exists pub fn search_unknown_block( &mut self, block_root: Hash256, peer_source: &[PeerId], cx: &mut SyncNetworkContext, ) { - self.new_current_lookup(block_root, None, None, peer_source, cx) + self.new_current_lookup(block_root, None, None, peer_source, cx); } /// A block or blob triggers the search of a parent. /// Check if this new lookup extends a bad chain: /// - Extending `child_block_root_trigger` would exceed the max depth /// - `block_root_to_search` is a failed chain + /// Returns true if the lookup is created or already exists pub fn search_parent_of_child( &mut self, block_root_to_search: Hash256, child_block_root_trigger: Hash256, peers: &[PeerId], cx: &mut SyncNetworkContext, - ) { - for parent_chain in self.active_parent_lookups() { - if parent_chain.last() == Some(&child_block_root_trigger) + ) -> bool { + let parent_chains = self.active_parent_lookups(); + + for (chain_idx, parent_chain) in parent_chains.iter().enumerate() { + if parent_chain.ancestor() == child_block_root_trigger && parent_chain.len() >= PARENT_DEPTH_TOLERANCE { debug!(self.log, "Parent lookup chain too long"; "block_root" => ?block_root_to_search); // Searching for this parent would extend a parent chain over the max // Insert the tip only to failed chains - let chain_hash = parent_chain.first().expect("has at least one element"); - self.failed_chains.insert(*chain_hash); - - // Drop all lookups descending from the child of the too long parent chain - if let Some((lookup_id, lookup)) = self - .single_block_lookups - .iter() - .find(|(_, l)| l.block_root() == child_block_root_trigger) - { - for &peer_id in lookup.all_used_peers() { - cx.report_peer(peer_id, PeerAction::LowToleranceError, "chain_too_long"); + self.failed_chains.insert(parent_chain.tip); + + // Note: Drop only the chain that's too long until it merges with another chain + // that's not too long. Consider this attack: there's a chain of valid unknown + // blocks A -> B. A malicious peer builds `PARENT_DEPTH_TOLERANCE` garbage + // blocks on top of A forming A -> C. The malicious peer forces us to fetch C + // from it, which will result in parent A hitting the chain_too_long error. Then + // the valid chain A -> B is dropped too. + if let Ok(block_to_drop) = find_oldest_fork_ancestor(parent_chains, chain_idx) { + // Drop all lookups descending from the child of the too long parent chain + if let Some((lookup_id, lookup)) = self + .single_block_lookups + .iter() + .find(|(_, l)| l.block_root() == block_to_drop) + { + for &peer_id in lookup.all_used_peers() { + cx.report_peer( + peer_id, + PeerAction::LowToleranceError, + "chain_too_long", + ); + } + self.drop_lookup_and_childs(*lookup_id); } - - self.drop_lookup_and_childs(*lookup_id); } - return; + return false; } } @@ -206,6 +201,7 @@ impl BlockLookups { /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. + /// Returns true if the lookup is created or already exists fn new_current_lookup( &mut self, block_root: Hash256, @@ -213,19 +209,14 @@ impl BlockLookups { awaiting_parent: Option, peers: &[PeerId], cx: &mut SyncNetworkContext, - ) { + ) -> bool { // If this block or it's parent is part of a known failed chain, ignore it. if self.failed_chains.contains(&block_root) { debug!(self.log, "Block is from a past failed chain. Dropping"; "block_root" => ?block_root); - return; - } - - // TODO: Is checking if parent root is a failed chain necessary? - if let Some(parent_root) = block_component.as_ref().map(|b| b.parent_root()) { - if self.failed_chains.contains(&parent_root) { - debug!(self.log, "Parent of block is from a past failed chain. Dropping"; "parent_root" => ?parent_root, "block_root" => ?block_root); - return; + for peer_id in peers { + cx.report_peer(*peer_id, PeerAction::MidToleranceError, "failed_chain"); } + return false; } // Do not re-request a block that is already being requested @@ -239,7 +230,18 @@ impl BlockLookups { if let Some(block_component) = block_component { lookup.add_child_components(block_component); } - return; + return true; + } + + // Ensure that awaiting parent exists, otherwise this lookup won't be able to make progress + if let Some(awaiting_parent) = awaiting_parent { + if !self + .single_block_lookups + .iter() + .any(|(_, lookup)| lookup.is_for_block(awaiting_parent)) + { + return false; + } } let msg = if block_component.is_some() { @@ -263,22 +265,15 @@ impl BlockLookups { lookup.add_child_components(block_component); } - let block_root = lookup.block_root(); match lookup.continue_requests(cx) { Ok(()) => { self.single_block_lookups.insert(lookup.id, lookup); self.update_metrics(); - - metrics::set_gauge( - &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, - self.single_block_lookups.len() as i64, - ); + true } Err(e) => { - debug!(self.log, "Single block lookup failed"; - "error" => ?e, - "block_root" => ?block_root, - ); + debug!(self.log, "Single block lookup failed"; "block_root" => ?block_root, "error" => ?e); + false } } } @@ -329,7 +324,7 @@ impl BlockLookups { match response { Ok((response, seen_timestamp)) => { debug!(self.log, - "Block lookup download success"; + "Received lookup download success"; "block_root" => %block_root, "peer_id" => %peer_id, "response_type" => ?response_type, @@ -349,7 +344,7 @@ impl BlockLookups { } Err(e) => { debug!(self.log, - "Block lookup download failure"; + "Received lookup download failure"; "block_root" => %block_root, "peer_id" => %peer_id, "response_type" => ?response_type, @@ -398,7 +393,7 @@ impl BlockLookups { let id = match process_type { BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => id, }; - debug!(self.log, "Dropping lookup on request error"; "id" => id, "error" => ?e); + debug!(self.log, "Dropping lookup on request error"; "component" => process_type.component(), "id" => process_type.id(), "error" => ?e); self.drop_lookup_and_childs(id); self.update_metrics(); } @@ -420,18 +415,16 @@ impl BlockLookups { debug!( self.log, - "Block component processed for lookup"; - "response_type" => ?R::response_type(), + "Received lookup processing result"; + "component" => ?R::response_type(), "block_root" => ?block_root, "result" => ?result, - "id" => lookup_id, ); let action = match result { BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) | BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown(_)) => { // Successfully imported - trace!(self.log, "Single block processing succeeded"; "block" => %block_root); request_state.on_processing_success()?; Action::Continue } @@ -448,13 +441,15 @@ impl BlockLookups { // wrong. If we already had both a block and blobs response processed, we should penalize the // blobs peer because they did not provide all blobs on the initial request. if lookup.both_components_processed() { - if let Ok(blob_peer) = lookup.blob_request_state.state.on_processing_failure() { - cx.report_peer( - blob_peer, - PeerAction::MidToleranceError, - "sent_incomplete_blobs", - ); - } + let blob_peer = lookup + .blob_request_state + .state + .on_post_process_validation_failure()?; + cx.report_peer( + blob_peer, + PeerAction::MidToleranceError, + "sent_incomplete_blobs", + ); } Action::Retry } @@ -463,17 +458,16 @@ impl BlockLookups { // This implies that the cpu is overloaded. Drop the request. warn!( self.log, - "Single block processing was ignored, cpu might be overloaded"; - "action" => "dropping single block request" + "Lookup component processing ignored, cpu might be overloaded"; + "component" => ?R::response_type(), ); Action::Drop } BlockProcessingResult::Err(e) => { - trace!(self.log, "Single block processing failed"; "block_root" => %block_root, "error" => %e); match e { BlockError::BeaconChainError(e) => { // Internal error - error!(self.log, "Beacon chain error processing single block"; "block_root" => %block_root, "error" => ?e); + error!(self.log, "Beacon chain error processing lookup component"; "block_root" => %block_root, "error" => ?e); Action::Drop } BlockError::ParentUnknown(block) => { @@ -513,7 +507,7 @@ impl BlockLookups { Action::Drop } other => { - warn!(self.log, "Invalid block in single block lookup"; "block_root" => %block_root, "error" => ?other); + debug!(self.log, "Invalid lookup component"; "block_root" => %block_root, "component" => ?R::response_type(), "error" => ?other); let peer_id = request_state.on_processing_failure()?; // TODO: Why is the original code downscoring the block peer regardless of // type of request? Sending a blob for verification can result in an error @@ -521,7 +515,10 @@ impl BlockLookups { cx.report_peer( peer_id, PeerAction::MidToleranceError, - "single_block_failure", + match R::response_type() { + ResponseType::Block => "lookup_block_processing_failure", + ResponseType::Blob => "lookup_blobs_processing_failure", + }, ); Action::Retry @@ -533,15 +530,13 @@ impl BlockLookups { match action { Action::Retry => { // Trigger download for all components in case `MissingComponents` failed the blob - // request. - // TODO: `continue_requests` must know when to not move a request forward: - // - If parent unknown do not request data? - // - If parent unknown error hold off for processing + // request. Also if blobs are `AwaitingProcessing` and need to be progressed lookup.continue_requests(cx)?; } Action::ParentUnknown { parent_root } => { let peers = lookup.all_available_peers().cloned().collect::>(); lookup.set_awaiting_parent(parent_root); + debug!(self.log, "Marking lookup as awaiting parent"; "lookup" => %block_root, "parent_root" => %parent_root); self.search_parent_of_child(parent_root, block_root, &peers, cx); } Action::Drop => { @@ -553,6 +548,7 @@ impl BlockLookups { // Drop this completed lookup only self.single_block_lookups.remove(&lookup_id); self.update_metrics(); + debug!(self.log, "Dropping completed lookup"; "block" => %block_root); // Block imported, continue the requests of pending child blocks self.continue_child_lookups(block_root, cx); } @@ -560,13 +556,14 @@ impl BlockLookups { Ok(()) } + /// Makes progress on the immediate children of `block_root` pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext) { let mut failed_lookups = vec![]; // < need to clean failed lookups latter to re-borrow &mut self for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.resolve_awaiting_parent() == Some(block_root) { - // Continue lookup - debug!(self.log, "Continuing child lookup"; "parent_root" => %block_root, "block_root" => %lookup.block_root()); + if lookup.awaiting_parent() == Some(block_root) { + lookup.resolve_awaiting_parent(); + debug!(self.log, "Continuing child lookup"; "parent_root" => %block_root, "block_root" => %lookup.block_root()); if let Err(e) = lookup.continue_requests(cx) { debug!(self.log, "Error continuing lookup"; "id" => id, "error" => ?e); failed_lookups.push(*id); @@ -579,6 +576,9 @@ impl BlockLookups { } } + /// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need + /// the parent to make progress to resolve, therefore we must drop them is the parent is + /// dropped. pub fn drop_lookup_and_childs(&mut self, dropped_id: SingleLookupId) { if let Some(dropped_lookup) = self.single_block_lookups.remove(&dropped_id) { debug!(self.log, "Dropping child lookup"; "id" => ?dropped_id, "block_root" => %dropped_lookup.block_root()); diff --git a/beacon_node/network/src/sync/block_lookups/parent_chain.rs b/beacon_node/network/src/sync/block_lookups/parent_chain.rs new file mode 100644 index 00000000000..0571e6d0dbb --- /dev/null +++ b/beacon_node/network/src/sync/block_lookups/parent_chain.rs @@ -0,0 +1,197 @@ +use std::collections::{HashMap, HashSet}; + +use beacon_chain::BeaconChainTypes; +use types::Hash256; + +use super::single_block_lookup::SingleBlockLookup; + +pub(crate) struct Node { + block_root: Hash256, + parent_root: Option, +} + +impl From<&SingleBlockLookup> for Node { + fn from(value: &SingleBlockLookup) -> Self { + Self { + block_root: value.block_root(), + parent_root: value.awaiting_parent(), + } + } +} + +pub(crate) struct NodeChain { + // Parent chain blocks in descending slot order + pub(crate) chain: Vec, + pub(crate) tip: Hash256, +} + +impl NodeChain { + pub(crate) fn ancestor(&self) -> Hash256 { + self.chain.last().copied().unwrap_or(self.tip) + } + pub(crate) fn len(&self) -> usize { + self.chain.len() + } +} + +/// Given a set of nodes that reference each other, returns a list of chains with unique tips that +/// contain at least two elements. In descending slot order (tip first). +pub(crate) fn compute_parent_chains(nodes: &[Node]) -> Vec { + let mut child_to_parent = HashMap::new(); + let mut parent_to_child = HashMap::>::new(); + for node in nodes { + child_to_parent.insert(node.block_root, node.parent_root); + if let Some(parent_root) = node.parent_root { + parent_to_child + .entry(parent_root) + .or_default() + .push(node.block_root); + } + } + + let mut parent_chains = vec![]; + + // Iterate blocks which no child + for tip in nodes { + let mut block_root = tip.block_root; + if parent_to_child.get(&block_root).is_none() { + let mut chain = vec![]; + + // Resolve chain of blocks + loop { + if let Some(parent_root) = child_to_parent.get(&block_root) { + // block_root is a known block that may or may not have a parent root + chain.push(block_root); + if let Some(parent_root) = parent_root { + block_root = *parent_root; + continue; + } + } + break; + } + + if chain.len() > 1 { + parent_chains.push(NodeChain { + chain, + tip: tip.block_root, + }); + } + } + } + + parent_chains +} + +/// Given a list of node chains, find the oldest node of a specific chain that is not contained in +/// any other chain. +pub(crate) fn find_oldest_fork_ancestor( + parent_chains: Vec, + chain_idx: usize, +) -> Result { + let mut other_blocks = HashSet::new(); + + // Register blocks from other chains + for (i, parent_chain) in parent_chains.iter().enumerate() { + if i != chain_idx { + for block in &parent_chain.chain { + other_blocks.insert(block); + } + } + } + + // Should never happen + let parent_chain = parent_chains.get(chain_idx).ok_or("chain_idx off bounds")?; + // Find the first block in the target parent chain that is not in other parent chains + // Iterate in ascending slot order + for block in parent_chain.chain.iter().rev() { + if !other_blocks.contains(block) { + return Ok(*block); + } + } + + // If no match means that the chain is fully contained within another chain. This should never + // happen, but if that was the case just return the tip + Ok(parent_chain.tip) +} + +#[cfg(test)] +mod tests { + use super::{compute_parent_chains, find_oldest_fork_ancestor, Node}; + use types::Hash256; + + fn h(n: u64) -> Hash256 { + Hash256::from_low_u64_be(n) + } + + fn n(block: u64) -> Node { + Node { + block_root: h(block), + parent_root: None, + } + } + + fn np(parent: u64, block: u64) -> Node { + Node { + block_root: h(block), + parent_root: Some(h(parent)), + } + } + + fn compute_parent_chains_test(nodes: &[Node], expected_chain: Vec>) { + assert_eq!( + compute_parent_chains(nodes) + .iter() + .map(|c| c.chain.clone()) + .collect::>(), + expected_chain + ); + } + + fn find_oldest_fork_ancestor_test(nodes: &[Node], expected: Hash256) { + let chains = compute_parent_chains(nodes); + println!( + "chains {:?}", + chains.iter().map(|c| &c.chain).collect::>() + ); + assert_eq!(find_oldest_fork_ancestor(chains, 0).unwrap(), expected); + } + + #[test] + fn compute_parent_chains_empty_case() { + compute_parent_chains_test(&[], vec![]); + } + + #[test] + fn compute_parent_chains_single_branch() { + compute_parent_chains_test(&[n(0), np(0, 1), np(1, 2)], vec![vec![h(2), h(1), h(0)]]); + } + + #[test] + fn compute_parent_chains_single_branch_with_solo() { + compute_parent_chains_test( + &[n(0), np(0, 1), np(1, 2), np(3, 4)], + vec![vec![h(2), h(1), h(0)]], + ); + } + + #[test] + fn compute_parent_chains_two_forking_branches() { + compute_parent_chains_test( + &[n(0), np(0, 1), np(1, 2), np(1, 3)], + vec![vec![h(2), h(1), h(0)], vec![h(3), h(1), h(0)]], + ); + } + + #[test] + fn compute_parent_chains_two_independent_branches() { + compute_parent_chains_test( + &[n(0), np(0, 1), np(1, 2), n(3), np(3, 4)], + vec![vec![h(2), h(1), h(0)], vec![h(4), h(3)]], + ); + } + + #[test] + fn find_oldest_fork_ancestor_simple_case() { + find_oldest_fork_ancestor_test(&[n(0), np(0, 1), np(1, 2), np(0, 3)], h(1)) + } +} diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 8e51b508eb5..9349e24c69c 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -67,8 +67,8 @@ impl SingleBlockLookup { /// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for /// processing. - pub fn resolve_awaiting_parent(&mut self) -> Option { - self.awaiting_parent.take() + pub fn resolve_awaiting_parent(&mut self) { + self.awaiting_parent = None; } pub fn add_child_components(&mut self, block_component: BlockComponent) { @@ -106,7 +106,7 @@ impl SingleBlockLookup { self.block_request_state .state .get_available_peers() - .chain(self.blob_request_state.state.get_used_peers()) + .chain(self.blob_request_state.state.get_available_peers()) .unique() } @@ -131,10 +131,12 @@ impl SingleBlockLookup { .state .peek_downloaded_data() .map(|block| block.num_expected_blobs()); + let block_is_processed = self.block_request_state.state.is_processed(); R::request_state_mut(self).continue_request( id, awaiting_parent, downloaded_block_expected_blobs, + block_is_processed, cx, ) } @@ -214,7 +216,7 @@ pub enum State { Downloading, AwaitingProcess(DownloadResult), Processing(DownloadResult), - Processed { peer_id: PeerId }, + Processed(PeerId), } /// Object representing the state of a single block or blob lookup request. @@ -294,7 +296,7 @@ impl SingleLookupRequestState { Ok(()) } other => Err(LookupRequestError::BadState(format!( - "request bad state, expected AwaitingDownload got {other}" + "Bad state on_download_start expected AwaitingDownload got {other}" ))), } } @@ -309,7 +311,7 @@ impl SingleLookupRequestState { Ok(()) } other => Err(LookupRequestError::BadState(format!( - "request bad state, expected Downloading got {other}" + "Bad state on_download_failure expected Downloading got {other}" ))), } } @@ -324,7 +326,7 @@ impl SingleLookupRequestState { Ok(()) } other => Err(LookupRequestError::BadState(format!( - "request bad state, expected Downloading got {other}" + "Bad state on_download_success expected Downloading got {other}" ))), } } @@ -351,7 +353,7 @@ impl SingleLookupRequestState { Ok(()) } other => Err(LookupRequestError::BadState(format!( - "request bad state, expected Processing got {other}" + "Bad state on revert_to_awaiting_processing expected Processing got {other}" ))), } } @@ -366,19 +368,34 @@ impl SingleLookupRequestState { Ok(peer_id) } other => Err(LookupRequestError::BadState(format!( - "request bad state, expected Processing got {other}" + "Bad state on_processing_failure expected Processing got {other}" ))), } } - pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> { + pub fn on_processing_success(&mut self) -> Result { match &self.state { State::Processing(result) => { - self.state = State::Processed { peer_id: result.3 }; - Ok(()) + let peer_id = result.3; + self.state = State::Processed(peer_id); + Ok(peer_id) + } + other => Err(LookupRequestError::BadState(format!( + "Bad state on_processing_success expected Processing got {other}" + ))), + } + } + + pub fn on_post_process_validation_failure(&mut self) -> Result { + match &self.state { + State::Processed(peer_id) => { + let peer_id = *peer_id; + self.failed_processing = self.failed_processing.saturating_add(1); + self.state = State::AwaitingDownload; + Ok(peer_id) } other => Err(LookupRequestError::BadState(format!( - "request bad state, expected Processing got {other}" + "Bad state on_post_process_validation_failure expected Processed got {other}" ))), } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 57a6e7412e3..ff02a19a554 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -214,6 +214,15 @@ impl TestRig { self.sync_manager.active_parent_lookups().len() } + fn assert_single_lookups_count(&self, count: usize) { + assert_eq!( + self.active_single_lookups_count(), + count, + "Unexpected count of single lookups. Current lookups: {:?}", + self.active_single_lookups() + ); + } + fn assert_parent_lookups_count(&self, count: usize) { assert_eq!( self.active_parent_lookups_count(), @@ -224,8 +233,26 @@ impl TestRig { ); } + fn assert_lookup_is_active(&self, block_root: Hash256) { + let lookups = self.sync_manager.active_single_lookups(); + if !lookups.iter().any(|l| l.1 == block_root) { + panic!("Expected lookup {block_root} to be the only active: {lookups:?}"); + } + } + + fn insert_failed_chain(&mut self, block_root: Hash256) { + self.sync_manager.insert_failed_chain(block_root); + } + + fn assert_not_failed_chain(&mut self, chain_hash: Hash256) { + let failed_chains = self.sync_manager.get_failed_chains(); + if failed_chains.contains(&chain_hash) { + panic!("failed chains contain {chain_hash:?}: {failed_chains:?}"); + } + } + fn failed_chains_contains(&mut self, chain_hash: &Hash256) -> bool { - self.sync_manager.failed_chains_contains(chain_hash) + self.sync_manager.get_failed_chains().contains(chain_hash) } fn find_single_lookup_for(&self, block_root: Hash256) -> Id { @@ -236,14 +263,6 @@ impl TestRig { .0 } - fn expect_no_active_parent_lookups(&self) { - assert!( - self.active_parent_lookups().is_empty(), - "expected no parent lookups: {:?}", - self.active_parent_lookups() - ); - } - fn expect_no_active_single_lookups(&self) { assert!( self.active_single_lookups().is_empty(), @@ -253,22 +272,16 @@ impl TestRig { } fn expect_no_active_lookups(&self) { - self.expect_no_active_parent_lookups(); self.expect_no_active_single_lookups(); } - #[track_caller] - fn assert_parent_lookups_consistency(&self) { - let hashes = self.active_parent_lookups(); - let expected = hashes.len(); - assert_eq!( - expected, - hashes - .into_iter() - .collect::>() - .len(), - "duplicated chain hashes in parent queue" - ) + fn expect_lookups(&self, expected_block_roots: &[Hash256]) { + let block_roots = self + .active_single_lookups() + .iter() + .map(|(_, b, _)| *b) + .collect::>(); + assert_eq!(&block_roots, expected_block_roots); } fn new_connected_peer(&mut self) -> PeerId { @@ -290,32 +303,35 @@ impl TestRig { self.parent_block_processed_imported(chain_hash); } // Send final import event for the block that triggered the lookup - let trigger_lookup = self - .active_single_lookups() - .iter() - .find(|(_, block_root, _)| block_root == &chain_hash) - .copied() - .unwrap_or_else(|| panic!("There should exist a single block lookup for {chain_hash}")); - self.single_block_component_processed_imported(trigger_lookup.0, chain_hash); + self.single_block_component_processed_imported(chain_hash); } - fn parent_block_processed(&mut self, chain_hash: Hash256, result: BlockProcessingResult) { - // Locate a parent lookup chain with tip hash `chain_hash` + /// Locate a parent lookup chain with tip hash `chain_hash` + fn find_oldest_parent_lookup(&self, chain_hash: Hash256) -> Hash256 { let parent_chain = self .active_parent_lookups() .into_iter() .find(|chain| chain.first() == Some(&chain_hash)) .unwrap_or_else(|| { panic!( - "No parent chain with chain_hash {chain_hash:?}: {:?}", - self.active_parent_lookups() + "No parent chain with chain_hash {chain_hash:?}: Parent lookups {:?} Single lookups {:?}", + self.active_parent_lookups(), + self.active_single_lookups(), ) }); + *parent_chain.last().unwrap() + } - let id = self.find_single_lookup_for(*parent_chain.last().unwrap()); + fn parent_block_processed(&mut self, chain_hash: Hash256, result: BlockProcessingResult) { + let id = self.find_single_lookup_for(self.find_oldest_parent_lookup(chain_hash)); self.single_block_component_processed(id, result); } + fn parent_blob_processed(&mut self, chain_hash: Hash256, result: BlockProcessingResult) { + let id = self.find_single_lookup_for(self.find_oldest_parent_lookup(chain_hash)); + self.single_blob_component_processed(id, result); + } + fn parent_block_processed_imported(&mut self, chain_hash: Hash256) { self.parent_block_processed( chain_hash, @@ -330,20 +346,17 @@ impl TestRig { }) } - fn single_block_component_processed_imported(&mut self, id: Id, block_root: Hash256) { + fn single_block_component_processed_imported(&mut self, block_root: Hash256) { + let id = self.find_single_lookup_for(block_root); self.single_block_component_processed( id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)), ) } - fn single_blob_component_processed( - &mut self, - id: SingleLookupReqId, - result: BlockProcessingResult, - ) { + fn single_blob_component_processed(&mut self, id: Id, result: BlockProcessingResult) { self.send_sync_message(SyncMessage::BlockComponentProcessed { - process_type: BlockProcessType::SingleBlob { id: id.lookup_id }, + process_type: BlockProcessType::SingleBlob { id }, result, }) } @@ -384,7 +397,10 @@ impl TestRig { peer_id: PeerId, blob_sidecar: Option>>, ) { - self.log("parent_lookup_blob_response"); + self.log(&format!( + "parent_lookup_blob_response {:?}", + blob_sidecar.as_ref().map(|b| b.index) + )); self.send_sync_message(SyncMessage::RpcBlob { request_id: SyncRequestId::SingleBlob { id }, peer_id, @@ -609,10 +625,11 @@ impl TestRig { #[track_caller] fn expect_empty_beacon_processor(&mut self) { - assert_eq!( - self.beacon_processor_rx.try_recv().expect_err("must err"), - mpsc::error::TryRecvError::Empty - ); + match self.beacon_processor_rx.try_recv() { + Err(mpsc::error::TryRecvError::Empty) => {} // ok + Ok(event) => panic!("expected empty beacon processor: {:?}", event), + other => panic!("unexpected err {:?}", other), + } } #[track_caller] @@ -626,7 +643,7 @@ impl TestRig { }) .unwrap_or_else(|_| { panic!( - "Expected peer penalty for {peer_id}: {:#?}", + "Expected '{expect_penalty_msg}' penalty for peer {peer_id}: {:#?}", self.network_rx_queue ) }); @@ -636,6 +653,11 @@ impl TestRig { ); } + pub fn expect_single_penalty(&mut self, peer_id: PeerId, expect_penalty_msg: &'static str) { + self.expect_penalty(peer_id, expect_penalty_msg); + self.expect_no_penalty_for(peer_id); + } + pub fn block_with_parent_and_blobs( &mut self, parent_root: Hash256, @@ -651,19 +673,47 @@ impl TestRig { pub fn rand_blockchain(&mut self, depth: usize) -> Vec>> { let mut blocks = Vec::>>::with_capacity(depth); - while blocks.len() < depth { + for slot in 0..depth { let parent = blocks .last() .map(|b| b.canonical_root()) .unwrap_or_else(Hash256::random); let mut block = self.rand_block(); *block.message_mut().parent_root_mut() = parent; + *block.message_mut().slot_mut() = slot.into(); blocks.push(block.into()); } + self.log(&format!( + "Blockchain dump {:#?}", + blocks + .iter() + .map(|b| format!( + "block {} {} parent {}", + b.slot(), + b.canonical_root(), + b.parent_root() + )) + .collect::>() + )); blocks } } +#[test] +fn stable_rng() { + let mut rng = XorShiftRng::from_seed([42; 16]); + let (block, _) = generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut rng); + // TODO: Make rand block generation stable + assert_ne!( + block.canonical_root(), + Hash256::from_slice( + &hex::decode("9cfcfc321759d8a2c38d6541a966da5e88fe8729ed5a5ab37013781ff097b0d6") + .unwrap() + ), + "rng produces a consistent value" + ); +} + #[test] fn test_single_block_lookup_happy_path() { let mut rig = TestRig::test_setup(); @@ -686,7 +736,7 @@ fn test_single_block_lookup_happy_path() { // Send the stream termination. Peer should have not been penalized, and the request removed // after processing. rig.single_lookup_block_response(id, peer_id, None); - rig.single_block_component_processed_imported(id.lookup_id, block_root); + rig.single_block_component_processed_imported(block_root); rig.expect_empty_network(); rig.expect_no_active_lookups(); } @@ -776,7 +826,7 @@ fn test_single_block_lookup_becomes_parent_request() { id.lookup_id, BlockError::ParentUnknown(RpcBlock::new_without_blobs(None, block)).into(), ); - assert_eq!(rig.active_single_lookups_count(), 1); + assert_eq!(rig.active_single_lookups_count(), 2); // 2 = current + parent rig.expect_parent_request_block_and_blobs(parent_root); rig.expect_empty_network(); assert_eq!(rig.active_parent_lookups_count(), 1); @@ -863,8 +913,7 @@ fn test_parent_lookup_empty_response() { // Processing succeeds, now the rest of the chain should be sent for processing. rig.parent_block_processed_imported(block_root); - let id = rig.find_single_lookup_for(block_root); - rig.single_block_component_processed_imported(id, block_root); + rig.single_block_component_processed_imported(block_root); rig.expect_no_active_lookups(); } @@ -990,21 +1039,18 @@ fn test_parent_lookup_too_many_processing_attempts_must_blacklist() { } rig.log("Now fail processing a block in the parent request"); - for i in 0..PROCESSING_FAILURES { + for _ in 0..PROCESSING_FAILURES { let id = rig.expect_block_parent_request(parent_root); - // Blobs are only requested in the first iteration as this test only retries blocks - if rig.after_deneb() && i != 0 { - let _ = rig.expect_blob_parent_request(parent_root); - } - assert!(!rig.failed_chains_contains(&block_root)); + // Blobs are only requested in the previous first iteration as this test only retries blocks + rig.assert_not_failed_chain(block_root); // send the right parent but fail processing rig.parent_lookup_block_response(id, peer_id, Some(parent.clone().into())); rig.parent_block_processed(block_root, BlockError::InvalidSignature.into()); rig.parent_lookup_block_response(id, peer_id, None); - rig.expect_penalty(peer_id, "parent_request_err"); + rig.expect_penalty(peer_id, "lookup_block_processing_failure"); } - assert!(rig.failed_chains_contains(&block_root)); + rig.assert_not_failed_chain(block_root); rig.expect_no_active_lookups(); } @@ -1033,12 +1079,12 @@ fn test_parent_lookup_too_deep() { ) } - rig.expect_penalty(peer_id, ""); + rig.expect_penalty(peer_id, "chain_too_long"); assert!(rig.failed_chains_contains(&chain_hash)); } #[test] -fn test_parent_lookup_disconnection() { +fn test_parent_lookup_disconnection_no_peers_left() { let mut rig = TestRig::test_setup(); let peer_id = rig.new_connected_peer(); let trigger_block = rig.rand_block(); @@ -1048,6 +1094,46 @@ fn test_parent_lookup_disconnection() { rig.expect_no_active_lookups(); } +#[test] +fn test_parent_lookup_disconnection_peer_left() { + let mut rig = TestRig::test_setup(); + let peer_ids = (0..2).map(|_| rig.new_connected_peer()).collect::>(); + let trigger_block = rig.rand_block(); + // lookup should have two peers associated with the same block + for peer_id in peer_ids.iter() { + rig.trigger_unknown_parent_block(*peer_id, trigger_block.clone().into()); + } + // Disconnect the first peer only, which is the one handling the request + rig.peer_disconnected(*peer_ids.first().unwrap()); + rig.assert_parent_lookups_count(1); +} + +#[test] +fn test_skip_creating_failed_parent_lookup() { + let mut rig = TestRig::test_setup(); + let (_, block, parent_root, _) = rig.rand_block_and_parent(); + let peer_id = rig.new_connected_peer(); + rig.insert_failed_chain(parent_root); + rig.trigger_unknown_parent_block(peer_id, block.into()); + // Expect single penalty for peer, despite dropping two lookups + rig.expect_single_penalty(peer_id, "failed_chain"); + // Both current and parent lookup should be rejected + rig.expect_no_active_lookups(); +} + +#[test] +fn test_skip_creating_failed_current_lookup() { + let mut rig = TestRig::test_setup(); + let (_, block, parent_root, block_root) = rig.rand_block_and_parent(); + let peer_id = rig.new_connected_peer(); + rig.insert_failed_chain(block_root); + rig.trigger_unknown_parent_block(peer_id, block.into()); + // Expect single penalty for peer + rig.expect_single_penalty(peer_id, "failed_chain"); + // Only the current lookup should be rejected + rig.expect_lookups(&[parent_root]); +} + #[test] fn test_single_block_lookup_ignored_response() { let mut rig = TestRig::test_setup(); @@ -1137,18 +1223,18 @@ fn test_same_chain_race_condition() { BlockError::ParentUnknown(RpcBlock::new_without_blobs(None, block)).into(), ) } - rig.assert_parent_lookups_consistency(); } - // Processing succeeds, now the rest of the chain should be sent for processing. - rig.expect_parent_chain_process(); - // Try to get this block again while the chain is being processed. We should not request it again. let peer_id = rig.new_connected_peer(); - rig.trigger_unknown_parent_block(peer_id, trigger_block); - rig.assert_parent_lookups_consistency(); + rig.trigger_unknown_parent_block(peer_id, trigger_block.clone()); + rig.expect_empty_network(); - rig.parent_chain_processed_success(chain_hash, &blocks); + // Processing succeeds, now the rest of the chain should be sent for processing. + for block in blocks.iter().skip(1).chain(&[trigger_block]) { + rig.expect_parent_chain_process(); + rig.single_block_component_processed_imported(block.canonical_root()); + } rig.expect_no_active_lookups(); } @@ -1302,6 +1388,21 @@ mod deneb_only { }) } + fn log(self, msg: &str) -> Self { + self.rig.log(msg); + self + } + + // TODO: Eventually deprecate this function + fn set_block_id_for_import(mut self) -> Self { + let lookup_id = self.rig.find_single_lookup_for(self.block_root); + self.block_req_id = Some(SingleLookupReqId { + lookup_id, + req_id: 0, + }); + self + } + fn parent_block_response(mut self) -> Self { self.rig.expect_empty_network(); let block = self.parent_block.pop_front().unwrap().clone(); @@ -1356,7 +1457,8 @@ mod deneb_only { self.rig.expect_empty_network(); // The request should still be active. - assert_eq!(self.rig.active_single_lookups_count(), 1); + self.rig + .assert_lookup_is_active(self.block.canonical_root()); self } @@ -1369,7 +1471,8 @@ mod deneb_only { self.peer_id, Some(blob.clone()), ); - assert_eq!(self.rig.active_single_lookups_count(), 1); + self.rig + .assert_lookup_is_active(self.block.canonical_root()); } self.rig.single_lookup_blob_response( self.blob_req_id.expect("blob request id"), @@ -1428,6 +1531,29 @@ mod deneb_only { self } + fn block_missing_components(mut self) -> Self { + self.rig.single_block_component_processed( + self.block_req_id.expect("block request id").lookup_id, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + self.block.slot(), + self.block_root, + )), + ); + self.rig.expect_empty_network(); + self.rig.assert_single_lookups_count(1); + self + } + + fn blob_imported(mut self) -> Self { + self.rig.single_blob_component_processed( + self.blob_req_id.expect("blob request id").lookup_id, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), + ); + self.rig.expect_empty_network(); + self.rig.assert_single_lookups_count(0); + self + } + fn block_imported(mut self) -> Self { // Missing blobs should be the request is not removed, the outstanding blobs request should // mean we do not send a new request. @@ -1436,7 +1562,7 @@ mod deneb_only { BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), ); self.rig.expect_empty_network(); - assert_eq!(self.rig.active_single_lookups_count(), 0); + self.rig.assert_single_lookups_count(0); self } @@ -1451,7 +1577,19 @@ mod deneb_only { self } + fn parent_blob_imported(mut self) -> Self { + self.rig.log("parent_blob_imported"); + self.rig.parent_blob_processed( + self.block_root, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(self.block_root)), + ); + self.rig.expect_empty_network(); + self.rig.assert_parent_lookups_count(0); + self + } + fn parent_block_unknown_parent(mut self) -> Self { + self.rig.log("parent_block_unknown_parent"); let block = self.unknown_parent_block.take().unwrap(); // Now this block is the one we expect requests from self.block = block.clone(); @@ -1469,6 +1607,26 @@ mod deneb_only { self } + fn parent_block_missing_components(mut self) -> Self { + let block = self.unknown_parent_block.clone().unwrap(); + self.rig.parent_block_processed( + self.block_root, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + block.slot(), + block.canonical_root(), + )), + ); + self.rig.parent_blob_processed( + self.block_root, + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + block.slot(), + block.canonical_root(), + )), + ); + assert_eq!(self.rig.active_parent_lookups_count(), 1); + self + } + fn invalid_parent_processed(mut self) -> Self { self.rig.parent_block_processed( self.block_root, @@ -1483,18 +1641,19 @@ mod deneb_only { self.block_req_id.expect("block request id").lookup_id, BlockProcessingResult::Err(BlockError::ProposalSignatureInvalid), ); - assert_eq!(self.rig.active_single_lookups_count(), 1); + self.rig.assert_single_lookups_count(1); self } fn invalid_blob_processed(mut self) -> Self { - self.rig.single_block_component_processed( + self.rig.log("invalid_blob_processed"); + self.rig.single_blob_component_processed( self.blob_req_id.expect("blob request id").lookup_id, BlockProcessingResult::Err(BlockError::AvailabilityCheck( AvailabilityCheckError::KzgVerificationFailed, )), ); - assert_eq!(self.rig.active_single_lookups_count(), 1); + self.rig.assert_single_lookups_count(1); self } @@ -1506,19 +1665,19 @@ mod deneb_only { self.block_root, )), ); - assert_eq!(self.rig.active_single_lookups_count(), 1); + self.rig.assert_single_lookups_count(1); self } fn missing_components_from_blob_request(mut self) -> Self { self.rig.single_blob_component_processed( - self.blob_req_id.expect("blob request id"), + self.blob_req_id.expect("blob request id").lookup_id, BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( self.slot, self.block_root, )), ); - assert_eq!(self.rig.active_single_lookups_count(), 1); + self.rig.assert_single_lookups_count(1); self } @@ -1607,8 +1766,9 @@ mod deneb_only { tester .block_response_triggering_process() .blobs_response() + .block_missing_components() // blobs not yet imported .blobs_response_was_valid() - .block_imported(); + .blob_imported(); // now blobs resolve as imported } #[test] @@ -1618,10 +1778,11 @@ mod deneb_only { }; tester - .blobs_response() - .blobs_response_was_valid() + .blobs_response() // hold blobs for processing .block_response_triggering_process() - .block_imported(); + .block_missing_components() // blobs not yet imported + .blobs_response_was_valid() + .blob_imported(); // now blobs resolve as imported } #[test] @@ -1655,7 +1816,7 @@ mod deneb_only { .missing_components_from_block_request() .empty_blobs_response() .missing_components_from_blob_request() - .expect_penalty("single_blob_failure") + .expect_penalty("sent_incomplete_blobs") .expect_blobs_request() .expect_no_block_request(); } @@ -1668,9 +1829,8 @@ mod deneb_only { tester .blobs_response() - .blobs_response_was_valid() .expect_no_penalty_and_no_requests() - .missing_components_from_blob_request() + // blobs not sent for processing until the block is processed .empty_block_response() .expect_penalty("NoResponseReturned") .expect_block_request() @@ -1686,11 +1846,11 @@ mod deneb_only { tester .block_response_triggering_process() .invalid_block_processed() - .expect_penalty("single_block_failure") + .expect_penalty("lookup_block_processing_failure") .expect_block_request() .expect_no_blobs_request() .blobs_response() - .missing_components_from_blob_request() + // blobs not sent for processing until the block is processed .expect_no_penalty_and_no_requests(); } @@ -1705,7 +1865,7 @@ mod deneb_only { .missing_components_from_block_request() .blobs_response() .invalid_blob_processed() - .expect_penalty("single_blob_failure") + .expect_penalty("lookup_blobs_processing_failure") .expect_blobs_request() .expect_no_block_request(); } @@ -1722,7 +1882,7 @@ mod deneb_only { .invalidate_blobs_too_few() .blobs_response() .missing_components_from_blob_request() - .expect_penalty("single_blob_failure") + .expect_penalty("sent_incomplete_blobs") .expect_blobs_request() .expect_no_block_request(); } @@ -1750,8 +1910,7 @@ mod deneb_only { tester .invalidate_blobs_too_few() - .blobs_response() - .blobs_response_was_valid() + .blobs_response() // blobs are not sent until the block is processed .expect_no_penalty_and_no_requests() .block_response_triggering_process(); } @@ -1806,9 +1965,8 @@ mod deneb_only { .parent_blob_response() .expect_block_process() .invalid_parent_processed() - .expect_penalty("parent_request_err") + .expect_penalty("lookup_block_processing_failure") .expect_parent_block_request() - .expect_parent_blobs_request() .expect_empty_beacon_processor(); } @@ -1879,15 +2037,22 @@ mod deneb_only { tester .blobs_response() + .log(" Return empty blobs for parent, block errors with missing components, downscore") .empty_parent_blobs_response() .expect_no_penalty_and_no_requests() .parent_block_response() - .expect_penalty("single_blob_failure") + .parent_block_missing_components() + .expect_penalty("sent_incomplete_blobs") + .log("Re-request parent blobs, succeed and import parent") .expect_parent_blobs_request() .parent_blob_response() .expect_block_process() - .parent_block_imported() - .expect_parent_chain_process(); + .parent_blob_imported() + .log("resolve original block trigger blobs request and import") + .blobs_response() + .set_block_id_for_import() + .block_imported() + .expect_no_active_lookups(); } #[test] @@ -1925,9 +2090,9 @@ mod deneb_only { .parent_blob_response() .expect_block_process() .invalid_parent_processed() - .expect_penalty("parent_request_err") + .expect_penalty("lookup_block_processing_failure") .expect_parent_block_request() - .expect_parent_blobs_request() + // blobs are not sent until block is processed .expect_empty_beacon_processor(); } @@ -2005,17 +2170,21 @@ mod deneb_only { }; tester - .block_response() // reply with current block - .empty_parent_blobs_response() // replies empty blobs to parent block - .expect_no_penalty_and_no_requests() // no penalty because parent block is unknown - .parent_block_response() // reply with parent block - .expect_penalty("single_blob_failure") // parent block has data, so penalize parent blob peer - .expect_parent_blobs_request() // re-request parent blobs - .parent_blob_response() // good response now - .expect_block_process() // send parent block for import - .parent_block_imported() // parent block imported + .block_response() + .log(" Return empty blobs for parent, block errors with missing components, downscore") + .empty_parent_blobs_response() + .expect_no_penalty_and_no_requests() + .parent_block_response() + .parent_block_missing_components() + .expect_penalty("sent_incomplete_blobs") + .log("Re-request parent blobs, succeed and import parent") + .expect_parent_blobs_request() + .parent_blob_response() + .expect_block_process() + .parent_blob_imported() + .log("resolve original block trigger blobs request and import") .blobs_response() - .block_imported() // resolve original block trigger blobs request and import + .block_imported() .expect_no_active_lookups(); } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fed5cbe090a..f7c9dd783c8 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -151,6 +151,21 @@ pub enum BlockProcessType { SingleBlob { id: Id }, } +impl BlockProcessType { + pub fn component(&self) -> &'static str { + match self { + BlockProcessType::SingleBlock { .. } => "block", + BlockProcessType::SingleBlob { .. } => "blob", + } + } + + pub fn id(&self) -> Id { + match self { + BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => *id, + } + } +} + #[derive(Debug)] pub enum BlockProcessingResult { Ok(AvailabilityProcessingStatus), @@ -261,12 +276,21 @@ impl SyncManager { #[cfg(test)] pub(crate) fn active_parent_lookups(&self) -> Vec> { - self.block_lookups.active_parent_lookups() + self.block_lookups + .active_parent_lookups() + .iter() + .map(|c| c.chain.clone()) + .collect() } #[cfg(test)] - pub(crate) fn failed_chains_contains(&mut self, chain_hash: &Hash256) -> bool { - self.block_lookups.failed_chains_contains(chain_hash) + pub(crate) fn get_failed_chains(&mut self) -> Vec { + self.block_lookups.get_failed_chains() + } + + #[cfg(test)] + pub(crate) fn insert_failed_chain(&mut self, block_root: Hash256) { + self.block_lookups.insert_failed_chain(block_root); } fn network_globals(&self) -> &NetworkGlobals { @@ -648,13 +672,7 @@ impl SyncManager { ) { match self.should_search_for_block(Some(slot), &peer_id) { Ok(_) => { - self.block_lookups.search_parent_of_child( - parent_root, - block_root, - &[peer_id], - &mut self.network, - ); - self.block_lookups.search_child_of_parent( + self.block_lookups.search_child_and_parent( block_root, block_component, peer_id, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 80481d10ca6..860192db684 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -620,7 +620,7 @@ impl SyncNetworkContext { ) -> Result<(), &'static str> { match self.beacon_processor_if_enabled() { Some(beacon_processor) => { - trace!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type); + debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type); if let Err(e) = beacon_processor.send_rpc_blobs(block_root, blobs, duration, process_type) { diff --git a/common/lru_cache/src/time.rs b/common/lru_cache/src/time.rs index 0b2fd835687..890bf47eb44 100644 --- a/common/lru_cache/src/time.rs +++ b/common/lru_cache/src/time.rs @@ -166,6 +166,12 @@ where self.map.contains(key) } + /// List known keys + pub fn keys(&mut self) -> impl Iterator { + self.update(); + self.map.iter() + } + /// Shrink the mappings to fit the current size. pub fn shrink_to_fit(&mut self) { self.map.shrink_to_fit(); diff --git a/consensus/types/src/test_utils/test_random/secret_key.rs b/consensus/types/src/test_utils/test_random/secret_key.rs index 3f3f6ed5184..da1614aa24e 100644 --- a/consensus/types/src/test_utils/test_random/secret_key.rs +++ b/consensus/types/src/test_utils/test_random/secret_key.rs @@ -2,6 +2,8 @@ use super::*; impl TestRandom for SecretKey { fn random_for_test(_rng: &mut impl RngCore) -> Self { + // TODO: Not deterministic generation. Using `SecretKey::deserialize` results in + // `BlstError(BLST_BAD_ENCODING)`, need to debug with blst source on what encoding expects. SecretKey::random() } } diff --git a/consensus/types/src/test_utils/test_random/signature.rs b/consensus/types/src/test_utils/test_random/signature.rs index 5b952296b61..8bc0d711103 100644 --- a/consensus/types/src/test_utils/test_random/signature.rs +++ b/consensus/types/src/test_utils/test_random/signature.rs @@ -1,11 +1,10 @@ use super::*; impl TestRandom for Signature { - fn random_for_test(rng: &mut impl RngCore) -> Self { - let secret_key = SecretKey::random_for_test(rng); - let mut message = vec![0; 32]; - rng.fill_bytes(&mut message); - - secret_key.sign(Hash256::from_slice(&message)) + fn random_for_test(_rng: &mut impl RngCore) -> Self { + // TODO: `SecretKey::random_for_test` does not return a deterministic signature. Since this + // signature will not pass verification we could just return the generator point or the + // generator point multiplied by a random scalar if we want disctint signatures. + Signature::infinity().expect("infinity signature is valid") } }