Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure lookup sync checks caches correctly #5840

Merged
merged 3 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,22 @@ struct PartialBeaconBlock<E: EthSpec> {
bls_to_execution_changes: Vec<SignedBlsToExecutionChange>,
}

pub enum BlockProcessStatus {
/// Block is not in any pre-import cache. Block may be in the data-base or in the fork-choice.
Unknown,
/// Block is currently processing but not yet validated.
NotValidated {
slot: Slot,
blob_kzg_commitments_count: usize,
},
/// Block is fully valid, but not yet imported. It's cached in the da_checker while awaiting
/// missing block components.
ExecutionValidated {
slot: Slot,
blob_kzg_commitments_count: usize,
},
}

pub type LightClientProducerEvent<T> = (Hash256, Slot, SyncAggregate<T>);

pub type BeaconForkChoice<T> = ForkChoice<
Expand Down Expand Up @@ -1237,6 +1253,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.store.get_blinded_block(block_root)?)
}

/// Return the status of a block as it progresses through the various caches of the beacon
/// chain. Used by sync to learn the status of a block and prevent repeated downloads /
/// processing attempts.
pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus {
if let Some(execution_valid_block) = self
.data_availability_checker
.get_execution_valid_block_summary(block_root)
{
return BlockProcessStatus::ExecutionValidated {
slot: execution_valid_block.slot,
blob_kzg_commitments_count: execution_valid_block.blob_kzg_commitments_count,
};
}

if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) {
// A block is on the `reqresp_pre_import_cache` but NOT in the
// `data_availability_checker` only if it is actively processing. We can expect a future
// event with the result of processing
return BlockProcessStatus::NotValidated {
slot: block.slot(),
blob_kzg_commitments_count: block.num_expected_blobs(),
};
}

BlockProcessStatus::Unknown
}

/// Returns the state at the given root, if any.
///
/// ## Errors
Expand Down
9 changes: 7 additions & 2 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ mod state_lru_cache;
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize;

use self::overflow_lru_cache::ExecutionValidBlockSummary;

/// The LRU Cache stores `PendingComponents` which can store up to
/// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So
/// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this
Expand Down Expand Up @@ -86,9 +88,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

/// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components.
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
pub fn get_execution_valid_block_summary(
&self,
block_root: &Hash256,
) -> Option<ExecutionValidBlockSummary> {
self.availability_cache
.has_execution_valid_block(block_root)
.get_execution_valid_block_summary(block_root)
}

/// Return the required blobs `block_root` expects if the block is currenlty in the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize;
use std::{collections::HashSet, sync::Arc};
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256};
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, Slot};

/// This represents the components of a partially available block
///
Expand All @@ -57,6 +57,11 @@ pub struct PendingComponents<E: EthSpec> {
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
}

pub struct ExecutionValidBlockSummary {
pub slot: Slot,
pub blob_kzg_commitments_count: usize,
}

impl<E: EthSpec> PendingComponents<E> {
/// Returns an immutable reference to the cached block.
pub fn get_cached_block(&self) -> &Option<DietAvailabilityPendingExecutedBlock<E>> {
Expand Down Expand Up @@ -544,12 +549,22 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}

/// Returns true if the block root is known, without altering the LRU ordering
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
if let Some(pending_components) = self.critical.read().peek_pending_components(block_root) {
pending_components.executed_block.is_some()
} else {
false
}
pub fn get_execution_valid_block_summary(
&self,
block_root: &Hash256,
) -> Option<ExecutionValidBlockSummary> {
self.critical
.read()
.peek_pending_components(block_root)
.and_then(|pending_components| {
pending_components
.executed_block
.as_ref()
.map(|block| ExecutionValidBlockSummary {
slot: block.as_block().slot(),
blob_kzg_commitments_count: block.num_blobs_expected(),
})
})
}

/// Fetch a blob from the cache without affecting the LRU ordering
Expand Down
7 changes: 4 additions & 3 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ pub mod validator_pubkey_cache;

pub use self::beacon_chain::{
AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse,
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, ProduceBlockVerification,
StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus,
ChainSegmentResult, ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate,
ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON,
};
pub use self::beacon_snapshot::BeaconSnapshot;
Expand Down
50 changes: 26 additions & 24 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::status::ToStatusMessage;
use crate::sync::block_lookups::SingleLookupId;
use crate::sync::manager::{BlockProcessType, SingleLookupReqId};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState};
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
Expand Down Expand Up @@ -337,26 +337,17 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id: PeerId,
block_root: Hash256,
) -> Result<LookupRequestResult, RpcRequestSendError> {
// da_checker includes block that are execution verified, but are missing components
if self
.chain
.data_availability_checker
.has_execution_valid_block(&block_root)
{
return Ok(LookupRequestResult::NoRequestNeeded);
}

// reqresp_pre_import_cache includes blocks that may not be yet execution verified
if self
.chain
.reqresp_pre_import_cache
.read()
.contains_key(&block_root)
{
// A block is on the `reqresp_pre_import_cache` but NOT in the
// `data_availability_checker` only if it is actively processing. We can expect a future
// event with the result of processing
return Ok(LookupRequestResult::Pending);
match self.chain.get_block_process_status(&block_root) {
// Unknown block, continue request to download
BlockProcessStatus::Unknown => {}
// Block is known are currently processing, expect a future event with the result of
// processing.
BlockProcessStatus::NotValidated { .. } => return Ok(LookupRequestResult::Pending),
// Block is fully validated. If it's not yet imported it's waiting for missing block
// components. Consider this request completed and do nothing.
BlockProcessStatus::ExecutionValidated { .. } => {
return Ok(LookupRequestResult::NoRequestNeeded)
}
}

let req_id = self.next_id();
Expand Down Expand Up @@ -401,9 +392,20 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
downloaded_block_expected_blobs: Option<usize>,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| {
self.chain
.data_availability_checker
.num_expected_blobs(&block_root)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can delete num_blobs_expected

// If the block is already being processed or fully validated, retrieve how many blobs
// it expects. Consider any stage of the block. If the block root has been validated, we
// can assert that this is the correct value of `blob_kzg_commitments_count`.
match self.chain.get_block_process_status(&block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated {
blob_kzg_commitments_count,
..
}
| BlockProcessStatus::ExecutionValidated {
blob_kzg_commitments_count,
..
} => Some(blob_kzg_commitments_count),
}
}) else {
// Wait to download the block before downloading blobs. Then we can be sure that the
// block has data, so there's no need to do "blind" requests for all possible blobs and
Expand Down
Loading