Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure lookup sync checks caches correctly #5840

Merged
merged 3 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes};
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes, BlockProcessStatus};
use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1};
use slog::{crit, debug, error, Logger};
use std::collections::HashMap;
Expand Down Expand Up @@ -410,15 +410,14 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {

fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
if self.check_caches == CheckCaches::Yes {
self.beacon_chain
.reqresp_pre_import_cache
.read()
.get(&root)
.map(|block| {
match self.beacon_chain.get_block_process_status(&root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
| BlockProcessStatus::ExecutionValidated(block) => {
metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS);
block.clone()
})
.or(self.beacon_chain.early_attester_cache.get_block(root))
Some(block)
}
}
} else {
None
}
Expand Down
41 changes: 41 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,20 @@ struct PartialBeaconBlock<E: EthSpec> {
bls_to_execution_changes: Vec<SignedBlsToExecutionChange>,
}

pub enum BlockProcessStatus<E: EthSpec> {
/// Block is not in any pre-import cache. Block may be in the data-base or in the fork-choice.
Unknown,
/// Block is currently processing but not yet validated.
NotValidated(Arc<SignedBeaconBlock<E>>),
/// Block is fully valid, but not yet imported. It's cached in the da_checker while awaiting
/// missing block components.
ExecutionValidated(Arc<SignedBeaconBlock<E>>),
}

pub struct BeaconChainMetrics {
pub reqresp_pre_import_cache_len: usize,
}

pub type LightClientProducerEvent<T> = (Hash256, Slot, SyncAggregate<T>);

pub type BeaconForkChoice<T> = ForkChoice<
Expand Down Expand Up @@ -1237,6 +1251,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.store.get_blinded_block(block_root)?)
}

/// Return the status of a block as it progresses through the various caches of the beacon
/// chain. Used by sync to learn the status of a block and prevent repeated downloads /
/// processing attempts.
pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus<T::EthSpec> {
if let Some(block) = self
.data_availability_checker
.get_execution_valid_block(block_root)
{
return BlockProcessStatus::ExecutionValidated(block);
}

if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) {
// A block is on the `reqresp_pre_import_cache` but NOT in the
// `data_availability_checker` only if it is actively processing. We can expect a future
// event with the result of processing
return BlockProcessStatus::NotValidated(block.clone());
}

BlockProcessStatus::Unknown
}

/// Returns the state at the given root, if any.
///
/// ## Errors
Expand Down Expand Up @@ -6630,6 +6665,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ForkName::Base => Err(Error::UnsupportedFork),
}
}

pub fn metrics(&self) -> BeaconChainMetrics {
BeaconChainMetrics {
reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(),
}
}
}

impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
Expand Down
15 changes: 5 additions & 10 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

/// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components.
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
self.availability_cache
.has_execution_valid_block(block_root)
}

/// Return the required blobs `block_root` expects if the block is currenlty in the cache.
pub fn num_expected_blobs(&self, block_root: &Hash256) -> Option<usize> {
pub fn get_execution_valid_block(
&self,
block_root: &Hash256,
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.and_then(|components| components.num_expected_blobs())
})
.get_execution_valid_block(block_root)
}

/// Return the set of imported blob indexes for `block_root`. Returns None if there is no block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize;
use std::{collections::HashSet, sync::Arc};
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256};
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};

/// This represents the components of a partially available block
///
Expand Down Expand Up @@ -544,12 +544,19 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}

/// Returns true if the block root is known, without altering the LRU ordering
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
if let Some(pending_components) = self.critical.read().peek_pending_components(block_root) {
pending_components.executed_block.is_some()
} else {
false
}
pub fn get_execution_valid_block(
&self,
block_root: &Hash256,
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
self.critical
.read()
.peek_pending_components(block_root)
.and_then(|pending_components| {
pending_components
.executed_block
.as_ref()
.map(|block| block.block_cloned())
})
}

/// Fetch a blob from the cache without affecting the LRU ordering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl<E: EthSpec> DietAvailabilityPendingExecutedBlock<E> {
&self.block
}

pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
self.block.clone()
}

pub fn num_blobs_expected(&self) -> usize {
self.block
.message()
Expand Down
7 changes: 4 additions & 3 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ pub mod validator_pubkey_cache;

pub use self::beacon_chain::{
AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse,
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, ProduceBlockVerification,
StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus,
ChainSegmentResult, ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate,
ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON,
};
pub use self::beacon_snapshot::BeaconSnapshot;
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,7 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
}

let attestation_stats = beacon_chain.op_pool.attestation_stats();
let chain_metrics = beacon_chain.metrics();

set_gauge_by_usize(
&BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE,
Expand All @@ -1200,7 +1201,7 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {

set_gauge_by_usize(
&BEACON_REQRESP_PRE_IMPORT_CACHE_SIZE,
beacon_chain.reqresp_pre_import_cache.read().len(),
chain_metrics.reqresp_pre_import_cache_len,
);

let da_checker_metrics = beacon_chain.data_availability_checker.metrics();
Expand Down
10 changes: 8 additions & 2 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1452,13 +1452,16 @@ fn block_in_processing_cache_becomes_invalid() {
let peer_id = r.new_connected_peer();
r.insert_block_to_processing_cache(block.clone().into());
r.trigger_unknown_block_from_attestation(block_root, peer_id);
// Should trigger blob request
let id = r.expect_blob_lookup_request(block_root);
// Should not trigger block request
r.expect_empty_network();
// Simulate invalid block, removing it from processing cache
r.simulate_block_gossip_processing_becomes_invalid(block_root);
// Should download block, then issue blobs request
r.complete_lookup_block_download(block);
let id = r.expect_blob_lookup_request(block_root);
// Should not trigger block or blob request
r.expect_empty_network();
r.complete_lookup_block_import_valid(block_root, false);
// Resolve blob and expect lookup completed
r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true);
Expand All @@ -1475,11 +1478,14 @@ fn block_in_processing_cache_becomes_valid_imported() {
let peer_id = r.new_connected_peer();
r.insert_block_to_processing_cache(block.clone().into());
r.trigger_unknown_block_from_attestation(block_root, peer_id);
// Should trigger blob request
let id = r.expect_blob_lookup_request(block_root);
// Should not trigger block request
r.expect_empty_network();
// Resolve the block from processing step
r.simulate_block_gossip_processing_becomes_valid_missing_components(block.into());
let id = r.expect_blob_lookup_request(block_root);
// Should not trigger block or blob request
r.expect_empty_network();
// Resolve blob and expect lookup completed
r.complete_single_lookup_blob_lookup_valid(id, peer_id, blobs, true);
r.expect_no_active_lookups();
Expand Down
44 changes: 20 additions & 24 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::status::ToStatusMessage;
use crate::sync::block_lookups::SingleLookupId;
use crate::sync::manager::{BlockProcessType, SingleLookupReqId};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
Expand Down Expand Up @@ -337,26 +337,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id: PeerId,
block_root: Hash256,
) -> Result<LookupRequestResult, RpcRequestSendError> {
// da_checker includes block that are execution verified, but are missing components
if self
.chain
.data_availability_checker
.has_execution_valid_block(&block_root)
{
return Ok(LookupRequestResult::NoRequestNeeded);
}

// reqresp_pre_import_cache includes blocks that may not be yet execution verified
if self
.chain
.reqresp_pre_import_cache
.read()
.contains_key(&block_root)
{
// A block is on the `reqresp_pre_import_cache` but NOT in the
// `data_availability_checker` only if it is actively processing. We can expect a future
// event with the result of processing
return Ok(LookupRequestResult::Pending);
match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download
BlockProcessStatus::Unknown => {}
// Block is known are currently processing, expect a future event with the result of
// processing.
BlockProcessStatus::NotValidated { .. } => return Ok(LookupRequestResult::Pending),
// Block is fully validated. If it's not yet imported it's waiting for missing block
// components. Consider this request completed and do nothing.
BlockProcessStatus::ExecutionValidated { .. } => {
return Ok(LookupRequestResult::NoRequestNeeded)
}
}

let req_id = self.next_id();
Expand Down Expand Up @@ -401,9 +392,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
downloaded_block_expected_blobs: Option<usize>,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| {
self.chain
.data_availability_checker
.num_expected_blobs(&block_root)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can delete num_blobs_expected

// If the block is already being processed or fully validated, retrieve how many blobs
// it expects. Consider any stage of the block. If the block root has been validated, we
// can assert that this is the correct value of `blob_kzg_commitments_count`.
match self.chain.get_block_process_status(&block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
| BlockProcessStatus::ExecutionValidated(block) => Some(block.num_expected_blobs()),
}
}) else {
// Wait to download the block before downloading blobs. Then we can be sure that the
// block has data, so there's no need to do "blind" requests for all possible blobs and
Expand Down
Loading