Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#5655 pr review #26

Merged
merged 10 commits into from
Apr 30, 2024
3 changes: 0 additions & 3 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2898,9 +2898,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
// TODO: Should also check for:
// - Parent block is known
// - Slot is not in the future
return Err(BlockError::BlockIsAlreadyKnown(block_root));
}

Expand Down
4 changes: 1 addition & 3 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,7 @@ impl TestRig {
block_root,
RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()),
std::time::Duration::default(),
BlockProcessType::ParentLookup {
chain_hash: Hash256::random(),
},
BlockProcessType::SingleBlock { id: 0 },
)
.unwrap();
}
Expand Down
62 changes: 32 additions & 30 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{Hash256, SignedBeaconBlock};
use types::SignedBeaconBlock;

use super::single_block_lookup::DownloadResult;
use super::SingleLookupId;
Expand All @@ -28,6 +28,16 @@ pub enum ResponseType {
/// is further back than the most recent head slot.
pub(crate) const PARENT_DEPTH_TOLERANCE: usize = SLOT_IMPORT_TOLERANCE * 2;

pub enum AwaitingParent {
True,
False,
}

pub enum BlockIsProcessed {
True,
False,
}

/// This trait unifies common single block lookup functionality across blocks and blobs. This
/// includes making requests, verifying responses, and handling processing results. A
/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is
Expand All @@ -47,16 +57,14 @@ pub trait RequestState<T: BeaconChainTypes> {
fn continue_request(
&mut self,
id: Id,
awaiting_parent: bool,
awaiting_parent: AwaitingParent,
downloaded_block_expected_blobs: Option<usize>,
block_is_processed: bool,
block_is_processed: BlockIsProcessed,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
// Attempt to progress awaiting downloads
if self.get_state().is_awaiting_download() {
// Verify the current request has not exceeded the maximum number of attempts.
// TODO: Okay to use `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS` for both current and parent
// lookups now? It not trivial to identify what is a "parent lookup" now.
let request_state = self.get_state();
if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS {
let cannot_process = request_state.more_failed_processing_attempts();
Expand All @@ -76,11 +84,9 @@ pub trait RequestState<T: BeaconChainTypes> {
// Otherwise, attempt to progress awaiting processing
// If this request is awaiting a parent lookup to be processed, do not send for processing.
// The request will be rejected with unknown parent error.
} else if !awaiting_parent &&
// TODO: Blob processing / import does not check for unknown parent. As a temporary fix
// and to emulate the behaviour before this PR, hold blobs for processing until the
// block has been processed i.e. it has a known parent.
(block_is_processed || matches!(Self::response_type(), ResponseType::Block))
} else if matches!(awaiting_parent, AwaitingParent::False)
&& (matches!(block_is_processed, BlockIsProcessed::True)
|| matches!(Self::response_type(), ResponseType::Block))
{
// maybe_start_processing returns Some if state == AwaitingProcess. This pattern is
// useful to conditionally access the result data.
Expand All @@ -103,10 +109,6 @@ pub trait RequestState<T: BeaconChainTypes> {

/* Response handling methods */

/// A getter for the parent root of the response. Returns an `Option` because we won't know
/// the blob parent if we don't end up getting any blobs in the response.
fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option<Hash256>;

/// Send the response to the beacon processor.
fn send_for_processing(
id: Id,
Expand Down Expand Up @@ -148,18 +150,20 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
.map_err(LookupRequestError::SendFailed)
}

fn get_parent_root(verified_response: &Arc<SignedBeaconBlock<T::EthSpec>>) -> Option<Hash256> {
Some(verified_response.parent_root())
}

fn send_for_processing(
id: SingleLookupId,
(block, block_root, seen_timestamp, _): DownloadResult<Self::VerifiedResponseType>,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
let DownloadResult {
value,
block_root,
seen_timestamp,
peer_id: _,
} = download_result;
cx.send_block_for_processing(
block_root,
RpcBlock::new_without_blobs(Some(block_root), block),
RpcBlock::new_without_blobs(Some(block_root), value),
seen_timestamp,
BlockProcessType::SingleBlock { id },
)
Expand Down Expand Up @@ -200,22 +204,20 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
.map_err(LookupRequestError::SendFailed)
}

fn get_parent_root(verified_response: &FixedBlobSidecarList<T::EthSpec>) -> Option<Hash256> {
verified_response
.into_iter()
.filter_map(|blob| blob.as_ref())
.map(|blob| blob.block_parent_root())
.next()
}

fn send_for_processing(
id: Id,
(verified, block_root, seen_timestamp, _): DownloadResult<Self::VerifiedResponseType>,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
let DownloadResult {
value,
block_root,
seen_timestamp,
peer_id: _,
} = download_result;
cx.send_blobs_for_processing(
block_root,
verified,
value,
seen_timestamp,
BlockProcessType::SingleBlob { id },
)
Expand Down
45 changes: 18 additions & 27 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use self::parent_chain::{compute_parent_chains, NodeChain};
use self::single_block_lookup::{DownloadResult, LookupRequestError, SingleBlockLookup};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, SingleBlockLookup};
use super::manager::{BlockProcessType, BlockProcessingResult};
use super::network_context::{RpcProcessingResult, SyncNetworkContext};
use crate::metrics;
Expand Down Expand Up @@ -38,8 +39,8 @@ pub enum BlockComponent<E: EthSpec> {
impl<E: EthSpec> BlockComponent<E> {
fn parent_root(&self) -> Hash256 {
match self {
BlockComponent::Block(block) => block.0.parent_root(),
BlockComponent::Blob(blob) => blob.0.block_parent_root(),
BlockComponent::Block(block) => block.value.parent_root(),
BlockComponent::Blob(blob) => blob.value.block_parent_root(),
}
}
}
Expand Down Expand Up @@ -106,8 +107,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

/* Lookup requests */

/// Creates a lookup for the block with the given `block_root` and immediately triggers it.
/// Returns true if the lookup is created or already exists
/// Creates a parent lookup for the block with the given `block_root` and immediately triggers it.
/// If a parent lookup exists or is triggered, a current lookup will be created.
pub fn search_child_and_parent(
&mut self,
block_root: Hash256,
Expand All @@ -132,7 +133,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}

/// Seach a block that we don't known its parent root.
/// Seach a block whose parent root is unknown.
/// Returns true if the lookup is created or already exists
pub fn search_unknown_block(
&mut self,
Expand Down Expand Up @@ -187,7 +188,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"chain_too_long",
);
}
self.drop_lookup_and_childs(*lookup_id);
self.drop_lookup_and_children(*lookup_id);
}
}

Expand Down Expand Up @@ -290,7 +291,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
) {
if let Err(e) = self.on_download_response_inner::<R>(id, peer_id, response, cx) {
debug!(self.log, "Dropping single lookup"; "id" => id, "err" => ?e);
self.drop_lookup_and_childs(id);
self.drop_lookup_and_children(id);
self.update_metrics();
}
}
Expand Down Expand Up @@ -333,12 +334,12 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// Register the download peer here. Once we have received some data over the wire we
// attribute it to this peer for scoring latter regardless of how the request was
// done.
request_state.on_download_success((
response,
request_state.on_download_success(DownloadResult {
value: response,
block_root,
seen_timestamp,
peer_id,
))?;
})?;
// continue_request will send for processing as the request state is AwaitingProcessing
lookup.continue_request::<R>(cx)
}
Expand Down Expand Up @@ -394,7 +395,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => id,
};
debug!(self.log, "Dropping lookup on request error"; "component" => process_type.component(), "id" => process_type.id(), "error" => ?e);
self.drop_lookup_and_childs(id);
self.drop_lookup_and_children(id);
self.update_metrics();
}
}
Expand Down Expand Up @@ -497,21 +498,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
{
// There errors indicate internal problems and should not downscore the peer
warn!(self.log, "Internal availability check failure"; "block_root" => %block_root, "error" => ?e);
// TODO: This lines represent an improper transition of download states,
// which can log errors in the future. If an error here causes the request
// to transition into a bad state, a future network message will cause
// the request to be dropped
//
// lookup.block_request_state.state.on_download_failure();
// lookup.blob_request_state.state.on_download_failure();
Action::Drop
}
other => {
debug!(self.log, "Invalid lookup component"; "block_root" => %block_root, "component" => ?R::response_type(), "error" => ?other);
let peer_id = request_state.on_processing_failure()?;
// TODO: Why is the original code downscoring the block peer regardless of
// type of request? Sending a blob for verification can result in an error
// attributable to the block peer?
cx.report_peer(
peer_id,
PeerAction::MidToleranceError,
Expand Down Expand Up @@ -541,7 +532,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
Action::Drop => {
// Drop with noop
self.drop_lookup_and_childs(lookup_id);
self.drop_lookup_and_children(lookup_id);
self.update_metrics();
}
Action::Continue => {
Expand Down Expand Up @@ -572,14 +563,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

for id in failed_lookups {
self.drop_lookup_and_childs(id);
self.drop_lookup_and_children(id);
}
}

/// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need
/// the parent to make progress to resolve, therefore we must drop them is the parent is
/// the parent to make progress to resolve, therefore we must drop them if the parent is
/// dropped.
pub fn drop_lookup_and_childs(&mut self, dropped_id: SingleLookupId) {
pub fn drop_lookup_and_children(&mut self, dropped_id: SingleLookupId) {
if let Some(dropped_lookup) = self.single_block_lookups.remove(&dropped_id) {
debug!(self.log, "Dropping child lookup"; "id" => ?dropped_id, "block_root" => %dropped_lookup.block_root());

Expand All @@ -591,7 +582,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.collect::<Vec<_>>();

for id in child_lookups {
self.drop_lookup_and_childs(id);
self.drop_lookup_and_children(id);
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions beacon_node/network/src/sync/block_lookups/parent_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,23 @@ pub(crate) fn compute_parent_chains(nodes: &[Node]) -> Vec<NodeChain> {

let mut parent_chains = vec![];

// Iterate blocks which no child
// Iterate blocks with no children
for tip in nodes {
let mut block_root = tip.block_root;
if parent_to_child.get(&block_root).is_none() {
let mut chain = vec![];

// Resolve chain of blocks
loop {
'inner: loop {
if let Some(parent_root) = child_to_parent.get(&block_root) {
// block_root is a known block that may or may not have a parent root
chain.push(block_root);
if let Some(parent_root) = parent_root {
block_root = *parent_root;
continue;
continue 'inner;
}
}
break;
break 'inner;
}

if chain.len() > 1 {
Expand Down Expand Up @@ -100,7 +100,9 @@ pub(crate) fn find_oldest_fork_ancestor(
}

// Should never happen
let parent_chain = parent_chains.get(chain_idx).ok_or("chain_idx off bounds")?;
let parent_chain = parent_chains
.get(chain_idx)
.ok_or("chain_idx out of bounds")?;
// Find the first block in the target parent chain that is not in other parent chains
// Iterate in ascending slot order
for block in parent_chain.chain.iter().rev() {
Expand All @@ -109,7 +111,7 @@ pub(crate) fn find_oldest_fork_ancestor(
}
}

// If no match means that the chain is fully contained within another chain. This should never
// No match means that the chain is fully contained within another chain. This should never
// happen, but if that was the case just return the tip
Ok(parent_chain.tip)
}
Expand Down
Loading
Loading