From 93e0649abca54b7f2af4de0f486aec26adb15eb7 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 13 May 2024 14:41:29 +0300 Subject: [PATCH] Notify lookup sync of gossip processing results (#5722) * Notify lookup sync of gossip processing results * Add tests * Add GossipBlockProcessResult event * Re-add dropped comments * Update beacon_node/network/src/network_beacon_processor/sync_methods.rs * update test_lookup_disconnection_peer_left --- .../src/block_verification_types.rs | 20 ++ .../src/data_availability_checker.rs | 7 +- .../overflow_lru_cache.rs | 13 +- .../gossip_methods.rs | 20 +- .../src/network_beacon_processor/mod.rs | 6 +- .../network_beacon_processor/sync_methods.rs | 18 +- .../network/src/sync/block_lookups/common.rs | 21 +- .../network/src/sync/block_lookups/mod.rs | 30 ++- .../sync/block_lookups/single_block_lookup.rs | 35 ++- .../network/src/sync/block_lookups/tests.rs | 241 +++++++++++++++++- beacon_node/network/src/sync/manager.rs | 19 +- .../network/src/sync/network_context.rs | 61 +++-- 12 files changed, 398 insertions(+), 93 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index d0360bf18e5..70f1e99ef74 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -314,6 +314,26 @@ pub struct BlockImportData { pub consensus_context: ConsensusContext, } +impl BlockImportData { + pub fn __new_for_test( + block_root: Hash256, + state: BeaconState, + parent_block: SignedBeaconBlock>, + ) -> Self { + Self { + block_root, + state, + parent_block, + parent_eth1_finalization_data: Eth1FinalizationData { + eth1_data: <_>::default(), + eth1_deposit_index: 0, + }, + confirmed_state_roots: vec![], + consensus_context: ConsensusContext::new(Slot::new(0)), + } + } +} + pub type GossipVerifiedBlockContents = (GossipVerifiedBlock, Option>); diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 27ed0ae6d56..a981d31e554 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -84,10 +84,11 @@ impl DataAvailabilityChecker { }) } - /// Checks if the block root is currenlty in the availability cache awaiting processing because + /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. - pub fn has_block(&self, block_root: &Hash256) -> bool { - self.availability_cache.has_block(block_root) + pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool { + self.availability_cache + .has_execution_valid_block(block_root) } /// Return the required blobs `block_root` expects if the block is currenlty in the cache. diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index f29cec92444..2e3c4aac558 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -432,11 +432,6 @@ impl Critical { Ok(()) } - /// Returns true if the block root is known, without altering the LRU ordering - pub fn has_block(&self, block_root: &Hash256) -> bool { - self.in_memory.peek(block_root).is_some() || self.store_keys.contains(block_root) - } - /// This only checks for the blobs in memory pub fn peek_blob( &self, @@ -549,8 +544,12 @@ impl OverflowLRUCache { } /// Returns true if the block root is known, without altering the LRU ordering - pub fn has_block(&self, block_root: &Hash256) -> bool { - self.critical.read().has_block(block_root) + pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool { + if let Some(pending_components) = self.critical.read().peek_pending_components(block_root) { + pending_components.executed_block.is_some() + } else { + false + } } /// Fetch a blob from the cache without affecting the LRU ordering diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index af7f3a53e56..cf9f3e54e1b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1187,19 +1187,18 @@ impl NetworkBeaconProcessor { "block_root" => %block_root, ); } - Err(BlockError::ParentUnknown(block)) => { - // Inform the sync manager to find parents for this block - // This should not occur. It should be checked by `should_forward_block` + Err(BlockError::ParentUnknown(_)) => { + // This should not occur. It should be checked by `should_forward_block`. + // Do not send sync message UnknownParentBlock to prevent conflicts with the + // BlockComponentProcessed message below. If this error ever happens, lookup sync + // can recover by receiving another block / blob / attestation referencing the + // chain that includes this block. error!( self.log, "Block with unknown parent attempted to be processed"; + "block_root" => %block_root, "peer_id" => %peer_id ); - self.send_sync_message(SyncMessage::UnknownParentBlock( - peer_id, - block.clone(), - block_root, - )); } Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { debug!( @@ -1263,6 +1262,11 @@ impl NetworkBeaconProcessor { &self.log, ); } + + self.send_sync_message(SyncMessage::GossipBlockProcessResult { + block_root, + imported: matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))), + }); } pub fn process_gossip_voluntary_exit( diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f10646c7414..cabe39f9292 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,7 +1,5 @@ -use crate::{ - service::NetworkMessage, - sync::{manager::BlockProcessType, SyncMessage}, -}; +use crate::sync::manager::BlockProcessType; +use crate::{service::NetworkMessage, sync::manager::SyncMessage}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain}; use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer}; diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index daa9a2cf197..f66879715de 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -170,17 +170,15 @@ impl NetworkBeaconProcessor { if reprocess_tx.try_send(reprocess_msg).is_err() { error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash) }; - if matches!(process_type, BlockProcessType::SingleBlock { .. }) { - self.chain.block_times_cache.write().set_time_observed( - hash, - slot, - seen_timestamp, - None, - None, - ); + self.chain.block_times_cache.write().set_time_observed( + hash, + slot, + seen_timestamp, + None, + None, + ); - self.chain.recompute_head_at_current_slot().await; - } + self.chain.recompute_head_at_current_slot().await; } // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index fa63e37c1b3..400d382d6d4 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -2,8 +2,8 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupRequestError, SingleBlockLookup, SingleLookupRequestState, }; use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId}; -use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE}; -use crate::sync::network_context::SyncNetworkContext; +use crate::sync::manager::{Id, SLOT_IMPORT_TOLERANCE}; +use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use std::sync::Arc; @@ -45,7 +45,7 @@ pub trait RequestState { peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result; + ) -> Result; /* Response handling methods */ @@ -80,7 +80,7 @@ impl RequestState for BlockRequestState { peer_id: PeerId, _: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { cx.block_lookup_request(id, peer_id, self.requested_block_root) .map_err(LookupRequestError::SendFailed) } @@ -97,10 +97,10 @@ impl RequestState for BlockRequestState { peer_id: _, } = download_result; cx.send_block_for_processing( + id, block_root, RpcBlock::new_without_blobs(Some(block_root), value), seen_timestamp, - BlockProcessType::SingleBlock { id }, ) .map_err(LookupRequestError::SendFailed) } @@ -128,7 +128,7 @@ impl RequestState for BlobRequestState { peer_id: PeerId, downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result { + ) -> Result { cx.blob_lookup_request( id, peer_id, @@ -149,13 +149,8 @@ impl RequestState for BlobRequestState { seen_timestamp, peer_id: _, } = download_result; - cx.send_blobs_for_processing( - block_root, - value, - seen_timestamp, - BlockProcessType::SingleBlob { id }, - ) - .map_err(LookupRequestError::SendFailed) + cx.send_blobs_for_processing(id, block_root, value, seen_timestamp) + .map_err(LookupRequestError::SendFailed) } fn response_type() -> ResponseType { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 3da2577114c..dd823a307b3 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -408,7 +408,10 @@ impl BlockLookups { self.on_processing_result_inner::>(id, result, cx) } }; - self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); + let id = match process_type { + BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => id, + }; + self.on_lookup_result(id, lookup_result, "processing_result", cx); } pub fn on_processing_result_inner>( @@ -521,6 +524,7 @@ impl BlockLookups { } other => { debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other); + let peer_id = request_state.on_processing_failure()?; cx.report_peer( peer_id, @@ -561,6 +565,30 @@ impl BlockLookups { } } + pub fn on_external_processing_result( + &mut self, + block_root: Hash256, + imported: bool, + cx: &mut SyncNetworkContext, + ) { + let Some((id, lookup)) = self + .single_block_lookups + .iter_mut() + .find(|(_, lookup)| lookup.is_for_block(block_root)) + else { + // Ok to ignore gossip process events + return; + }; + + let lookup_result = if imported { + Ok(LookupResult::Completed) + } else { + lookup.continue_requests(cx) + }; + let id = *id; + self.on_lookup_result(id, lookup_result, "external_processing_result", cx); + } + /// Makes progress on the immediate children of `block_root` pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext) { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index a5729f39062..6ee519b0dd1 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -2,7 +2,7 @@ use super::common::ResponseType; use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::Id; -use crate::sync::network_context::SyncNetworkContext; +use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; use beacon_chain::BeaconChainTypes; use itertools::Itertools; use rand::seq::IteratorRandom; @@ -179,11 +179,13 @@ impl SingleBlockLookup { .use_rand_available_peer() .ok_or(LookupRequestError::NoPeers)?; - // make_request returns true only if a request needs to be made - if request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { - request.get_state_mut().on_download_start()?; - } else { - request.get_state_mut().on_completed_request()?; + match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { + LookupRequestResult::RequestSent => request.get_state_mut().on_download_start()?, + LookupRequestResult::NoRequestNeeded => { + request.get_state_mut().on_completed_request()? + } + // Sync will receive a future event to make progress on the request, do nothing now + LookupRequestResult::Pending => return Ok(()), } // Otherwise, attempt to progress awaiting processing @@ -262,12 +264,16 @@ pub struct DownloadResult { pub peer_id: PeerId, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum State { AwaitingDownload, Downloading, AwaitingProcess(DownloadResult), + /// Request is processing, sent by lookup sync Processing(DownloadResult), + /// Request is processed: + /// - `Processed(Some)` if lookup sync downloaded and sent to process this request + /// - `Processed(None)` if another source (i.e. gossip) sent this component for processing Processed(Option), } @@ -428,12 +434,11 @@ impl SingleLookupRequestState { } } - pub fn on_processing_success(&mut self) -> Result { + pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> { match &self.state { State::Processing(result) => { - let peer_id = result.peer_id; - self.state = State::Processed(Some(peer_id)); - Ok(peer_id) + self.state = State::Processed(Some(result.peer_id)); + Ok(()) } other => Err(LookupRequestError::BadState(format!( "Bad state on_processing_success expected Processing got {other}" @@ -514,12 +519,6 @@ impl SingleLookupRequestState { impl std::fmt::Display for State { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - State::AwaitingDownload => write!(f, "AwaitingDownload"), - State::Downloading { .. } => write!(f, "Downloading"), - State::AwaitingProcess { .. } => write!(f, "AwaitingProcessing"), - State::Processing { .. } => write!(f, "Processing"), - State::Processed { .. } => write!(f, "Processed"), - } + write!(f, "{}", Into::<&'static str>::into(self)) } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 75e0fc524f1..761e54144d6 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -11,12 +11,17 @@ use std::sync::Arc; use super::*; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; -use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::blob_verification::GossipVerifiedBlob; +use beacon_chain::block_verification_types::{BlockImportData, RpcBlock}; use beacon_chain::builder::Witness; +use beacon_chain::data_availability_checker::Availability; use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::test_utils::{ build_log, generate_rand_block_and_blobs, BeaconChainHarness, EphemeralHarnessType, NumBlobs, }; +use beacon_chain::{ + AvailabilityPendingExecutedBlock, PayloadVerificationOutcome, PayloadVerificationStatus, +}; use beacon_processor::WorkEvent; use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; use lighthouse_network::types::SyncState; @@ -25,10 +30,12 @@ use slog::info; use slot_clock::{ManualSlotClock, SlotClock, TestingSlotClock}; use store::MemoryStore; use tokio::sync::mpsc; +use types::test_utils::TestRandom; use types::{ test_utils::{SeedableRng, XorShiftRng}, BlobSidecar, ForkName, MinimalEthSpec as E, SignedBeaconBlock, Slot, }; +use types::{BeaconState, BeaconStateBase}; type T = Witness, E, MemoryStore, MemoryStore>; @@ -68,6 +75,8 @@ struct TestRig { sync_manager: SyncManager, /// To manipulate sync state and peer connection status network_globals: Arc>, + /// Beacon chain harness + harness: BeaconChainHarness>, /// `rng` for generating test blocks and blobs. rng: XorShiftRng, fork_name: ForkName, @@ -129,6 +138,7 @@ impl TestRig { sync_recv, log.clone(), ), + harness, fork_name, log, } @@ -423,6 +433,63 @@ impl TestRig { }); } + fn complete_single_lookup_blob_download( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + blobs: Vec>, + ) { + for blob in blobs { + self.single_lookup_blob_response(id, peer_id, Some(blob.into())); + } + self.single_lookup_blob_response(id, peer_id, None); + } + + fn complete_single_lookup_blob_lookup_valid( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + blobs: Vec>, + import: bool, + ) { + let block_root = blobs.first().unwrap().block_root(); + let block_slot = blobs.first().unwrap().slot(); + self.complete_single_lookup_blob_download(id, peer_id, blobs); + self.expect_block_process(ResponseType::Blob); + self.single_blob_component_processed( + id.lookup_id, + if import { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)) + } else { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + block_slot, block_root, + )) + }, + ); + } + + fn complete_single_lookup_block_valid(&mut self, block: SignedBeaconBlock, import: bool) { + let block_root = block.canonical_root(); + let block_slot = block.slot(); + let id = self.expect_block_lookup_request(block_root); + self.expect_empty_network(); + let peer_id = self.new_connected_peer(); + self.single_lookup_block_response(id, peer_id, Some(block.into())); + self.single_lookup_block_response(id, peer_id, None); + self.expect_block_process(ResponseType::Block); + let id = self.find_single_lookup_for(block_root); + self.single_block_component_processed( + id, + if import { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(block_root)) + } else { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents( + block_slot, block_root, + )) + }, + ) + } + fn parent_lookup_failed(&mut self, id: SingleLookupReqId, peer_id: PeerId, error: RPCError) { self.send_sync_message(SyncMessage::RpcError { peer_id, @@ -714,6 +781,89 @@ impl TestRig { )); blocks } + + fn insert_block_to_da_checker(&mut self, block: Arc>) { + let state = BeaconState::Base(BeaconStateBase::random_for_test(&mut self.rng)); + let parent_block = self.rand_block(); + let import_data = BlockImportData::::__new_for_test( + block.canonical_root(), + state, + parent_block.into(), + ); + let payload_verification_outcome = PayloadVerificationOutcome { + payload_verification_status: PayloadVerificationStatus::Verified, + is_valid_merge_transition_block: false, + }; + let executed_block = + AvailabilityPendingExecutedBlock::new(block, import_data, payload_verification_outcome); + match self + .harness + .chain + .data_availability_checker + .put_pending_executed_block(executed_block) + .unwrap() + { + Availability::Available(_) => panic!("block removed from da_checker, available"), + Availability::MissingComponents(block_root) => { + self.log(&format!("inserted block to da_checker {block_root:?}")) + } + }; + } + + fn insert_blob_to_da_checker(&mut self, blob: BlobSidecar) { + match self + .harness + .chain + .data_availability_checker + .put_gossip_blob(GossipVerifiedBlob::__assumed_valid(blob.into())) + .unwrap() + { + Availability::Available(_) => panic!("blob removed from da_checker, available"), + Availability::MissingComponents(block_root) => { + self.log(&format!("inserted blob to da_checker {block_root:?}")) + } + }; + } + + fn insert_block_to_processing_cache(&mut self, block: Arc>) { + self.harness + .chain + .reqresp_pre_import_cache + .write() + .insert(block.canonical_root(), block); + } + + fn simulate_block_gossip_processing_becomes_invalid(&mut self, block_root: Hash256) { + self.harness + .chain + .reqresp_pre_import_cache + .write() + .remove(&block_root); + + self.send_sync_message(SyncMessage::GossipBlockProcessResult { + block_root, + imported: false, + }); + } + + fn simulate_block_gossip_processing_becomes_valid_missing_components( + &mut self, + block: Arc>, + ) { + let block_root = block.canonical_root(); + self.harness + .chain + .reqresp_pre_import_cache + .write() + .remove(&block_root); + + self.insert_block_to_da_checker(block); + + self.send_sync_message(SyncMessage::GossipBlockProcessResult { + block_root, + imported: false, + }); + } } #[test] @@ -1111,17 +1261,17 @@ fn test_parent_lookup_disconnection_no_peers_left() { } #[test] -fn test_parent_lookup_disconnection_peer_left() { +fn test_lookup_disconnection_peer_left() { let mut rig = TestRig::test_setup(); let peer_ids = (0..2).map(|_| rig.new_connected_peer()).collect::>(); - let trigger_block = rig.rand_block(); + let block_root = Hash256::random(); // lookup should have two peers associated with the same block for peer_id in peer_ids.iter() { - rig.trigger_unknown_parent_block(*peer_id, trigger_block.clone().into()); + rig.trigger_unknown_block_from_attestation(block_root, *peer_id); } // Disconnect the first peer only, which is the one handling the request rig.peer_disconnected(*peer_ids.first().unwrap()); - rig.assert_parent_lookups_count(1); + rig.assert_single_lookups_count(1); } #[test] @@ -1254,6 +1404,87 @@ fn test_same_chain_race_condition() { rig.expect_no_active_lookups(); } +#[test] +fn block_in_da_checker_skips_download() { + let Some(mut r) = TestRig::test_setup_after_deneb() else { + return; + }; + let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1)); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + r.insert_block_to_da_checker(block.into()); + r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should not trigger block request + let id = r.expect_blob_lookup_request(block_root); + r.expect_empty_network(); + // Resolve blob and expect lookup completed + r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true); + r.expect_no_active_lookups(); +} + +#[test] +fn block_in_processing_cache_becomes_invalid() { + let Some(mut r) = TestRig::test_setup_after_deneb() else { + return; + }; + let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1)); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + r.insert_block_to_processing_cache(block.clone().into()); + r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should not trigger block request + let id = r.expect_blob_lookup_request(block_root); + r.expect_empty_network(); + // Simulate invalid block, removing it from processing cache + r.simulate_block_gossip_processing_becomes_invalid(block_root); + // Should download and process the block + r.complete_single_lookup_block_valid(block, false); + // Resolve blob and expect lookup completed + r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true); + r.expect_no_active_lookups(); +} + +#[test] +fn block_in_processing_cache_becomes_valid_imported() { + let Some(mut r) = TestRig::test_setup_after_deneb() else { + return; + }; + let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1)); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + r.insert_block_to_processing_cache(block.clone().into()); + r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should not trigger block request + let id = r.expect_blob_lookup_request(block_root); + r.expect_empty_network(); + // Resolve the block from processing step + r.simulate_block_gossip_processing_becomes_valid_missing_components(block.into()); + // Resolve blob and expect lookup completed + r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true); + r.expect_no_active_lookups(); +} + +// IGNORE: wait for change that delays blob fetching to knowing the block +#[ignore] +#[test] +fn blobs_in_da_checker_skip_download() { + let Some(mut r) = TestRig::test_setup_after_deneb() else { + return; + }; + let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1)); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + for blob in blobs { + r.insert_blob_to_da_checker(blob); + } + r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should download and process the block + r.complete_single_lookup_block_valid(block, true); + // Should not trigger blob request + r.expect_empty_network(); + r.expect_no_active_lookups(); +} + mod deneb_only { use super::*; use beacon_chain::{ diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 56bce7acad5..6afaa76da9e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -144,6 +144,9 @@ pub enum SyncMessage { process_type: BlockProcessType, result: BlockProcessingResult, }, + + /// A block from gossip has completed processing, + GossipBlockProcessResult { block_root: Hash256, imported: bool }, } /// The type of processing specified for a received block. @@ -153,14 +156,6 @@ pub enum BlockProcessType { SingleBlob { id: Id }, } -impl BlockProcessType { - pub fn id(&self) -> Id { - match self { - BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => *id, - } - } -} - #[derive(Debug)] pub enum BlockProcessingResult { Ok(AvailabilityProcessingStatus), @@ -637,6 +632,14 @@ impl SyncManager { } => self .block_lookups .on_processing_result(process_type, result, &mut self.network), + SyncMessage::GossipBlockProcessResult { + block_root, + imported, + } => self.block_lookups.on_external_processing_result( + block_root, + imported, + &mut self.network, + ), SyncMessage::BatchProcessed { sync_type, result } => match sync_type { ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => { self.range_sync.handle_block_process_result( diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 88495a5b350..cc4d18fd68a 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -4,13 +4,13 @@ use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest}; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; -use super::manager::{BlockProcessType, Id, RequestId as SyncRequestId}; +use super::manager::{Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; -use crate::sync::manager::SingleLookupReqId; +use crate::sync::manager::{BlockProcessType, SingleLookupReqId}; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; @@ -81,6 +81,19 @@ impl From for LookupFailure { } } +pub enum LookupRequestResult { + /// A request is sent. Sync MUST receive an event from the network in the future for either: + /// completed response or failed request + RequestSent, + /// No request is sent, and no further action is necessary to consider this request completed + NoRequestNeeded, + /// No request is sent, but the request is not completed. Sync MUST receive some future event + /// that makes progress on the request. For example: request is processing from a different + /// source (i.e. block received from gossip) and sync MUST receive an event with that processing + /// result. + Pending, +} + /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { /// The network channel to relay messages to the Network service. @@ -305,14 +318,27 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, peer_id: PeerId, block_root: Hash256, - ) -> Result { + ) -> Result { + // da_checker includes block that are execution verified, but are missing components + if self + .chain + .data_availability_checker + .has_execution_valid_block(&block_root) + { + return Ok(LookupRequestResult::NoRequestNeeded); + } + + // reqresp_pre_import_cache includes blocks that may not be yet execution verified if self .chain .reqresp_pre_import_cache .read() .contains_key(&block_root) { - return Ok(false); + // A block is on the `reqresp_pre_import_cache` but NOT in the + // `data_availability_checker` only if it is actively processing. We can expect a future + // event with the result of processing + return Ok(LookupRequestResult::Pending); } let id = SingleLookupReqId { @@ -340,7 +366,7 @@ impl SyncNetworkContext { self.blocks_by_root_requests .insert(id, ActiveBlocksByRootRequest::new(request)); - Ok(true) + Ok(LookupRequestResult::RequestSent) } /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: @@ -355,7 +381,7 @@ impl SyncNetworkContext { peer_id: PeerId, block_root: Hash256, downloaded_block_expected_blobs: Option, - ) -> Result { + ) -> Result { let expected_blobs = downloaded_block_expected_blobs .or_else(|| { self.chain @@ -387,7 +413,7 @@ impl SyncNetworkContext { if indices.is_empty() { // No blobs required, do not issue any request - return Ok(false); + return Ok(LookupRequestResult::NoRequestNeeded); } let id = SingleLookupReqId { @@ -419,7 +445,7 @@ impl SyncNetworkContext { self.blobs_by_root_requests .insert(id, ActiveBlobsByRootRequest::new(request)); - Ok(true) + Ok(LookupRequestResult::RequestSent) } pub fn is_execution_engine_online(&self) -> bool { @@ -595,19 +621,19 @@ impl SyncNetworkContext { pub fn send_block_for_processing( &self, + id: Id, block_root: Hash256, block: RpcBlock, duration: Duration, - process_type: BlockProcessType, ) -> Result<(), &'static str> { match self.beacon_processor_if_enabled() { Some(beacon_processor) => { - debug!(self.log, "Sending block for processing"; "block" => ?block_root, "process" => ?process_type); + debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id); if let Err(e) = beacon_processor.send_rpc_beacon_block( block_root, block, duration, - process_type, + BlockProcessType::SingleBlock { id }, ) { error!( self.log, @@ -628,17 +654,20 @@ impl SyncNetworkContext { pub fn send_blobs_for_processing( &self, + id: Id, block_root: Hash256, blobs: FixedBlobSidecarList, duration: Duration, - process_type: BlockProcessType, ) -> Result<(), &'static str> { match self.beacon_processor_if_enabled() { Some(beacon_processor) => { - debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "process_type" => ?process_type); - if let Err(e) = - beacon_processor.send_rpc_blobs(block_root, blobs, duration, process_type) - { + debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id); + if let Err(e) = beacon_processor.send_rpc_blobs( + block_root, + blobs, + duration, + BlockProcessType::SingleBlob { id }, + ) { error!( self.log, "Failed to send sync blobs to processor";