Skip to content

Commit

Permalink
Single lookup improvements (sigp#5488)
Browse files Browse the repository at this point in the history
* Fix unexpected `UnrequestedBlobId` and `ExtraBlocksReturned` errors due to race conditions.

* Continue chain segment processing and skip any blocks that are already known, rather than returning an error.

* more de-dup checking

* ensure we don't reset `requested_ids` during rpc download

* better fix

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into more-dup-lookup-fixes

* remove chain hash check

* Merge branch 'fix-block-lookup-race' of https://github.com/jimmygchen/lighthouse into sean-test-lookups

* remove block check

* add back tests

* Log and CI fixes

* undue extra check

* Merge branch 'sean-test-lookups' of https://github.com/realbigsean/lighthouse into sean-test-lookups

* log improvements

* Improve logging
  • Loading branch information
realbigsean authored Mar 27, 2024
1 parent 250a5bd commit 334aa2e
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 17 deletions.
10 changes: 8 additions & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2796,6 +2796,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
}
Err(BlockError::BlockIsAlreadyKnown(block_root)) => {
debug!(self.log,
"Ignoring already known blocks while processing chain segment";
"block_root" => ?block_root);
continue;
}
Err(error) => {
return ChainSegmentResult::Failed {
imported_blocks,
Expand Down Expand Up @@ -3032,7 +3038,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
match import_block.await {
// The block was successfully verified and imported. Yay.
Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => {
trace!(
debug!(
self.log,
"Beacon block imported";
"block_root" => ?block_root,
Expand All @@ -3045,7 +3051,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(status)
}
Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => {
trace!(
debug!(
self.log,
"Beacon block awaiting blobs";
"block_root" => ?block_root,
Expand Down
9 changes: 8 additions & 1 deletion beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{get_block_root, GossipVerifiedBlock, PayloadVerificationOutcome};
use derivative::Derivative;
use ssz_types::VariableList;
use state_processing::ConsensusContext;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use types::blob_sidecar::{BlobIdentifier, BlobSidecarError, FixedBlobSidecarList};
use types::{
Expand All @@ -27,13 +28,19 @@ use types::{
/// Note: We make a distinction over blocks received over gossip because
/// in a post-deneb world, the blobs corresponding to a given block that are received
/// over rpc do not contain the proposer signature for dos resistance.
#[derive(Debug, Clone, Derivative)]
#[derive(Clone, Derivative)]
#[derivative(Hash(bound = "E: EthSpec"))]
pub struct RpcBlock<E: EthSpec> {
block_root: Hash256,
block: RpcBlockInner<E>,
}

impl<E: EthSpec> Debug for RpcBlock<E> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "RpcBlock({:?})", self.block_root)
}
}

impl<E: EthSpec> RpcBlock<E> {
pub fn block_root(&self) -> Hash256 {
self.block_root
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.processing_cache.read().get(&block_root).cloned()
}

/// A `None` indicates blobs are not required.
///
/// If there's no block, all possible ids will be returned that don't exist in the given blobs.
/// If there no blobs, all possible ids will be returned.
pub fn get_missing_blob_ids<V: AvailabilityView<T::EthSpec>>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"Gossip block is being processed";
"action" => "sending rpc block to reprocessing queue",
"block_root" => %block_root,
"process_type" => ?process_type,
);

// Send message to work reprocess queue to retry the block
Expand Down Expand Up @@ -149,6 +150,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"proposer" => block.message().proposer_index(),
"slot" => block.slot(),
"commitments" => commitments_formatted,
"process_type" => ?process_type,
);

let result = self
Expand Down Expand Up @@ -418,7 +420,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
(imported_blocks, Ok(_)) => {
debug!(self.log, "Parent lookup processed successfully");
debug!(
self.log, "Parent lookup processed successfully";
"chain_hash" => %chain_head,
"imported_blocks" => imported_blocks
);
BatchProcessResult::Success {
was_non_empty: imported_blocks > 0,
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L,
let received_id = blob.id();
if !self.requested_ids.contains(&received_id) {
self.state.register_failure_downloading();
Err(LookupVerifyError::UnrequestedBlobId)
Err(LookupVerifyError::UnrequestedBlobId(received_id))
} else {
// State should remain downloading until we receive the stream terminator.
self.requested_ids.remove(&received_id);
Expand Down
36 changes: 31 additions & 5 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if let Some(components) = child_components {
lookup.add_child_components(components);
}
debug!(self.log, "Already searching for block"; "block_root" => ?block_root);
return;
}

Expand All @@ -162,6 +163,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

// If the block was already downloaded, or is being downloaded in this moment, do not
// request it.
debug!(self.log, "Already searching for block in a parent lookup request"; "block_root" => ?block_root);
return;
}

Expand All @@ -171,6 +173,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.any(|(hashes, _last_parent_request)| hashes.contains(&block_root))
{
// we are already processing this block, ignore it.
debug!(self.log, "Already processing block in a parent request"; "block_root" => ?block_root);
return;
}

Expand Down Expand Up @@ -221,15 +224,23 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}) {
parent_lookup.add_peer(peer_id);
// we are already searching for this block, ignore it
debug!(self.log, "Already searching for parent block";
"block_root" => ?block_root, "parent_root" => ?parent_root);
return;
}

if self
.processing_parent_lookups
.values()
.any(|(hashes, _peers)| hashes.contains(&block_root) || hashes.contains(&parent_root))
.iter()
.any(|(chain_hash, (hashes, _peers))| {
chain_hash == &block_root
|| hashes.contains(&block_root)
|| hashes.contains(&parent_root)
})
{
// we are already processing this block, ignore it.
debug!(self.log, "Already processing parent block";
"block_root" => ?block_root, "parent_root" => ?parent_root);
return;
}
let parent_lookup = ParentLookup::new(
Expand Down Expand Up @@ -298,6 +309,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};

let expected_block_root = lookup.block_root();
debug!(self.log,
"Peer returned block for single lookup";
"peer_id" => %peer_id ,
"id" => ?id,
"block_root" => ?expected_block_root,
);

match self.single_lookup_response_inner::<R>(peer_id, response, seen_timestamp, cx, lookup)
{
Expand Down Expand Up @@ -478,6 +495,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
return;
};

debug!(self.log,
"Peer returned block for parent lookup";
"peer_id" => %peer_id ,
"id" => ?id,
"block_root" => ?parent_lookup.current_parent_request.block_request_state.requested_block_root,
);

match self.parent_lookup_response_inner::<R>(
peer_id,
response,
Expand Down Expand Up @@ -540,7 +564,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
| ParentVerifyError::NoBlockReturned
| ParentVerifyError::NotEnoughBlobsReturned
| ParentVerifyError::ExtraBlocksReturned
| ParentVerifyError::UnrequestedBlobId
| ParentVerifyError::UnrequestedBlobId(_)
| ParentVerifyError::ExtraBlobsReturned
| ParentVerifyError::InvalidIndex(_) => {
let e = e.into();
Expand Down Expand Up @@ -728,6 +752,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"Block component processed for lookup";
"response_type" => ?R::response_type(),
"block_root" => ?root,
"result" => ?result,
"id" => target_id,
);

match result {
Expand Down Expand Up @@ -901,7 +927,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(self.log, "Parent block processing succeeded"; &parent_lookup, "block_root" => ?block_root)
}
AvailabilityProcessingStatus::MissingComponents(_, block_root) => {
debug!(self.log, "Parent missing parts, triggering single block lookup "; &parent_lookup,"block_root" => ?block_root)
debug!(self.log, "Parent missing parts, triggering single block lookup"; &parent_lookup,"block_root" => ?block_root)
}
},
BlockProcessingResult::Err(e) => {
Expand Down Expand Up @@ -1223,7 +1249,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
) -> Result<(), LookupRequestError> {
match cx.beacon_processor_if_enabled() {
Some(beacon_processor) => {
trace!(self.log, "Sending block for processing"; "block" => ?block_root, "process" => ?process_type);
debug!(self.log, "Sending block for processing"; "block" => ?block_root, "process" => ?process_type);
if let Err(e) = beacon_processor.send_rpc_beacon_block(
block_root,
block,
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/network/src/sync/block_lookups/parent_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::collections::VecDeque;
use std::sync::Arc;
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::BlobIdentifier;

/// How many attempts we try to find a parent of a block before we give up trying.
pub(crate) const PARENT_FAIL_TOLERANCE: u8 = 5;
Expand All @@ -36,7 +37,7 @@ pub enum ParentVerifyError {
NoBlockReturned,
NotEnoughBlobsReturned,
ExtraBlocksReturned,
UnrequestedBlobId,
UnrequestedBlobId(BlobIdentifier),
ExtraBlobsReturned,
InvalidIndex(u64),
PreviousFailure { parent_root: Hash256 },
Expand Down Expand Up @@ -242,7 +243,7 @@ impl From<LookupVerifyError> for ParentVerifyError {
E::RootMismatch => ParentVerifyError::RootMismatch,
E::NoBlockReturned => ParentVerifyError::NoBlockReturned,
E::ExtraBlocksReturned => ParentVerifyError::ExtraBlocksReturned,
E::UnrequestedBlobId => ParentVerifyError::UnrequestedBlobId,
E::UnrequestedBlobId(blob_id) => ParentVerifyError::UnrequestedBlobId(blob_id),
E::ExtraBlobsReturned => ParentVerifyError::ExtraBlobsReturned,
E::InvalidIndex(index) => ParentVerifyError::InvalidIndex(index),
E::NotEnoughBlobsReturned => ParentVerifyError::NotEnoughBlobsReturned,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::EthSpec;

#[derive(Debug, PartialEq, Eq)]
Expand All @@ -31,7 +31,7 @@ pub enum LookupVerifyError {
RootMismatch,
NoBlockReturned,
ExtraBlocksReturned,
UnrequestedBlobId,
UnrequestedBlobId(BlobIdentifier),
ExtraBlobsReturned,
NotEnoughBlobsReturned,
InvalidIndex(u64),
Expand Down Expand Up @@ -257,7 +257,9 @@ impl<L: Lookup, T: BeaconChainTypes> SingleBlockLookup<L, T> {
/// of which blobs still need to be requested. Returns `true` if there are no more blobs to
/// request.
pub(crate) fn blobs_already_downloaded(&mut self) -> bool {
self.update_blobs_request();
if matches!(self.blob_request_state.state.state, State::AwaitingDownload) {
self.update_blobs_request();
}
self.blob_request_state.requested_ids.is_empty()
}

Expand Down
1 change: 1 addition & 0 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,7 @@ mod deneb_only {
.expect_blobs_request()
.expect_no_block_request();
}

#[test]
fn too_few_blobs_response_then_block_response_attestation() {
let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else {
Expand Down

0 comments on commit 334aa2e

Please sign in to comment.