Skip to content

Commit

Permalink
Notify lookup sync of gossip processing results (#5722)
Browse files Browse the repository at this point in the history
* Notify lookup sync of gossip processing results

* Add tests

* Add GossipBlockProcessResult event

* Re-add dropped comments

* Update beacon_node/network/src/network_beacon_processor/sync_methods.rs

* update test_lookup_disconnection_peer_left
  • Loading branch information
dapplion authored May 13, 2024
1 parent 6d792b4 commit 93e0649
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 93 deletions.
20 changes: 20 additions & 0 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,26 @@ pub struct BlockImportData<E: EthSpec> {
pub consensus_context: ConsensusContext<E>,
}

impl<E: EthSpec> BlockImportData<E> {
pub fn __new_for_test(
block_root: Hash256,
state: BeaconState<E>,
parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
) -> Self {
Self {
block_root,
state,
parent_block,
parent_eth1_finalization_data: Eth1FinalizationData {
eth1_data: <_>::default(),
eth1_deposit_index: 0,
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
}
}
}

pub type GossipVerifiedBlockContents<E> =
(GossipVerifiedBlock<E>, Option<GossipVerifiedBlobList<E>>);

Expand Down
7 changes: 4 additions & 3 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Checks if the block root is currenlty in the availability cache awaiting processing because
/// Checks if the block root is currenlty in the availability cache awaiting import because
/// of missing components.
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.availability_cache.has_block(block_root)
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
self.availability_cache
.has_execution_valid_block(block_root)
}

/// Return the required blobs `block_root` expects if the block is currenlty in the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,6 @@ impl<T: BeaconChainTypes> Critical<T> {
Ok(())
}

/// Returns true if the block root is known, without altering the LRU ordering
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.in_memory.peek(block_root).is_some() || self.store_keys.contains(block_root)
}

/// This only checks for the blobs in memory
pub fn peek_blob(
&self,
Expand Down Expand Up @@ -549,8 +544,12 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}

/// Returns true if the block root is known, without altering the LRU ordering
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.critical.read().has_block(block_root)
pub fn has_execution_valid_block(&self, block_root: &Hash256) -> bool {
if let Some(pending_components) = self.critical.read().peek_pending_components(block_root) {
pending_components.executed_block.is_some()
} else {
false
}
}

/// Fetch a blob from the cache without affecting the LRU ordering
Expand Down
20 changes: 12 additions & 8 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1187,19 +1187,18 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"block_root" => %block_root,
);
}
Err(BlockError::ParentUnknown(block)) => {
// Inform the sync manager to find parents for this block
// This should not occur. It should be checked by `should_forward_block`
Err(BlockError::ParentUnknown(_)) => {
// This should not occur. It should be checked by `should_forward_block`.
// Do not send sync message UnknownParentBlock to prevent conflicts with the
// BlockComponentProcessed message below. If this error ever happens, lookup sync
// can recover by receiving another block / blob / attestation referencing the
// chain that includes this block.
error!(
self.log,
"Block with unknown parent attempted to be processed";
"block_root" => %block_root,
"peer_id" => %peer_id
);
self.send_sync_message(SyncMessage::UnknownParentBlock(
peer_id,
block.clone(),
block_root,
));
}
Err(ref e @ BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => {
debug!(
Expand Down Expand Up @@ -1263,6 +1262,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&self.log,
);
}

self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))),
});
}

pub fn process_gossip_voluntary_exit(
Expand Down
6 changes: 2 additions & 4 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::{
service::NetworkMessage,
sync::{manager::BlockProcessType, SyncMessage},
};
use crate::sync::manager::BlockProcessType;
use crate::{service::NetworkMessage, sync::manager::SyncMessage};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer};
Expand Down
18 changes: 8 additions & 10 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash)
};
if matches!(process_type, BlockProcessType::SingleBlock { .. }) {
self.chain.block_times_cache.write().set_time_observed(
hash,
slot,
seen_timestamp,
None,
None,
);
self.chain.block_times_cache.write().set_time_observed(
hash,
slot,
seen_timestamp,
None,
None,
);

self.chain.recompute_head_at_current_slot().await;
}
self.chain.recompute_head_at_current_slot().await;
}
// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
Expand Down
21 changes: 8 additions & 13 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::manager::{Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub trait RequestState<T: BeaconChainTypes> {
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError>;
) -> Result<LookupRequestResult, LookupRequestError>;

/* Response handling methods */

Expand Down Expand Up @@ -80,7 +80,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
peer_id: PeerId,
_: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
.map_err(LookupRequestError::SendFailed)
}
Expand All @@ -97,10 +97,10 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
peer_id: _,
} = download_result;
cx.send_block_for_processing(
id,
block_root,
RpcBlock::new_without_blobs(Some(block_root), value),
seen_timestamp,
BlockProcessType::SingleBlock { id },
)
.map_err(LookupRequestError::SendFailed)
}
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(
id,
peer_id,
Expand All @@ -149,13 +149,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
seen_timestamp,
peer_id: _,
} = download_result;
cx.send_blobs_for_processing(
block_root,
value,
seen_timestamp,
BlockProcessType::SingleBlob { id },
)
.map_err(LookupRequestError::SendFailed)
cx.send_blobs_for_processing(id, block_root, value, seen_timestamp)
.map_err(LookupRequestError::SendFailed)
}

fn response_type() -> ResponseType {
Expand Down
30 changes: 29 additions & 1 deletion beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx)
}
};
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
let id = match process_type {
BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => id,
};
self.on_lookup_result(id, lookup_result, "processing_result", cx);
}

pub fn on_processing_result_inner<R: RequestState<T>>(
Expand Down Expand Up @@ -521,6 +524,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
other => {
debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other);

let peer_id = request_state.on_processing_failure()?;
cx.report_peer(
peer_id,
Expand Down Expand Up @@ -561,6 +565,30 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}

pub fn on_external_processing_result(
&mut self,
block_root: Hash256,
imported: bool,
cx: &mut SyncNetworkContext<T>,
) {
let Some((id, lookup)) = self
.single_block_lookups
.iter_mut()
.find(|(_, lookup)| lookup.is_for_block(block_root))
else {
// Ok to ignore gossip process events
return;
};

let lookup_result = if imported {
Ok(LookupResult::Completed)
} else {
lookup.continue_requests(cx)
};
let id = *id;
self.on_lookup_result(id, lookup_result, "external_processing_result", cx);
}

/// Makes progress on the immediate children of `block_root`
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self
Expand Down
35 changes: 17 additions & 18 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::common::ResponseType;
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id;
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use beacon_chain::BeaconChainTypes;
use itertools::Itertools;
use rand::seq::IteratorRandom;
Expand Down Expand Up @@ -179,11 +179,13 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
.use_rand_available_peer()
.ok_or(LookupRequestError::NoPeers)?;

// make_request returns true only if a request needs to be made
if request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
request.get_state_mut().on_download_start()?;
} else {
request.get_state_mut().on_completed_request()?;
match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
LookupRequestResult::RequestSent => request.get_state_mut().on_download_start()?,
LookupRequestResult::NoRequestNeeded => {
request.get_state_mut().on_completed_request()?
}
// Sync will receive a future event to make progress on the request, do nothing now
LookupRequestResult::Pending => return Ok(()),
}

// Otherwise, attempt to progress awaiting processing
Expand Down Expand Up @@ -262,12 +264,16 @@ pub struct DownloadResult<T: Clone> {
pub peer_id: PeerId,
}

#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum State<T: Clone> {
AwaitingDownload,
Downloading,
AwaitingProcess(DownloadResult<T>),
/// Request is processing, sent by lookup sync
Processing(DownloadResult<T>),
/// Request is processed:
/// - `Processed(Some)` if lookup sync downloaded and sent to process this request
/// - `Processed(None)` if another source (i.e. gossip) sent this component for processing
Processed(Option<PeerId>),
}

Expand Down Expand Up @@ -428,12 +434,11 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}

pub fn on_processing_success(&mut self) -> Result<PeerId, LookupRequestError> {
pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> {
match &self.state {
State::Processing(result) => {
let peer_id = result.peer_id;
self.state = State::Processed(Some(peer_id));
Ok(peer_id)
self.state = State::Processed(Some(result.peer_id));
Ok(())
}
other => Err(LookupRequestError::BadState(format!(
"Bad state on_processing_success expected Processing got {other}"
Expand Down Expand Up @@ -514,12 +519,6 @@ impl<T: Clone> SingleLookupRequestState<T> {

impl<T: Clone> std::fmt::Display for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
State::AwaitingDownload => write!(f, "AwaitingDownload"),
State::Downloading { .. } => write!(f, "Downloading"),
State::AwaitingProcess { .. } => write!(f, "AwaitingProcessing"),
State::Processing { .. } => write!(f, "Processing"),
State::Processed { .. } => write!(f, "Processed"),
}
write!(f, "{}", Into::<&'static str>::into(self))
}
}
Loading

0 comments on commit 93e0649

Please sign in to comment.