Skip to content

Commit

Permalink
Use Action in single_block_component_processed (#5564)
Browse files Browse the repository at this point in the history
* Use Action in single_block_component_processed

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_block_component_processed-action

* fix compile after merge

* add continue action for when we are awaiting other parts of missing components
  • Loading branch information
dapplion authored Apr 16, 2024
1 parent f5e0404 commit e5b8d12
Showing 1 changed file with 126 additions and 116 deletions.
242 changes: 126 additions & 116 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ pub type DownloadedBlock<E> = (Hash256, RpcBlock<E>);
const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 3;

enum Action {
Retry,
ParentUnknown { parent_root: Hash256, slot: Slot },
Drop,
Continue,
}

pub struct BlockLookups<T: BeaconChainTypes> {
/// Parent chain lookups being downloaded.
parent_lookups: SmallVec<[ParentLookup<T>; 3]>,
Expand Down Expand Up @@ -773,7 +780,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
return;
};

let root = lookup.block_root();
let block_root = lookup.block_root();
let request_state = R::request_state_mut(&mut lookup);

let peer_id = match request_state.get_state().processing_peer() {
Expand All @@ -787,28 +794,49 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.log,
"Block component processed for lookup";
"response_type" => ?R::response_type(),
"block_root" => ?root,
"block_root" => ?block_root,
"result" => ?result,
"id" => target_id,
);

match result {
BlockProcessingResult::Ok(status) => match status {
AvailabilityProcessingStatus::Imported(root) => {
trace!(self.log, "Single block processing succeeded"; "block" => %root);
}
AvailabilityProcessingStatus::MissingComponents(_, _block_root) => {
match self.handle_missing_components::<R>(cx, &mut lookup) {
Ok(()) => {
self.single_block_lookups.insert(target_id, lookup);
}
Err(e) => {
// Drop with an additional error.
warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e);
}
}
let action = match result {
BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_))
| BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown { .. }) => {
// Successfully imported
trace!(self.log, "Single block processing succeeded"; "block" => %block_root);
Action::Drop
}

BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents(
_,
_block_root,
)) => {
// `on_processing_success` is called here to ensure the request state is updated prior to checking
// if both components have been processed.
if R::request_state_mut(&mut lookup)
.get_state_mut()
.on_processing_success()
.is_err()
{
warn!(
self.log,
"Single block processing state incorrect";
"action" => "dropping single block request"
);
Action::Drop
// If this was the result of a block request, we can't determined if the block peer did anything
// wrong. If we already had both a block and blobs response processed, we should penalize the
// blobs peer because they did not provide all blobs on the initial request.
} else if lookup.both_components_processed() {
lookup.penalize_blob_peer(cx);

// Try it again if possible.
lookup.blob_request_state.state.on_processing_failure();
Action::Retry
} else {
Action::Continue
}
},
}
BlockProcessingResult::Ignored => {
// Beacon processor signalled to ignore the block processing result.
// This implies that the cpu is overloaded. Drop the request.
Expand All @@ -817,118 +845,88 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"Single block processing was ignored, cpu might be overloaded";
"action" => "dropping single block request"
);
Action::Drop
}
BlockProcessingResult::Err(e) => {
match self.handle_single_lookup_block_error(cx, lookup, peer_id, e) {
Ok(Some(lookup)) => {
self.single_block_lookups.insert(target_id, lookup);
let root = lookup.block_root();
trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e);
match e {
BlockError::BeaconChainError(e) => {
// Internal error
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
Action::Drop
}
Ok(None) => {
// Drop without an additional error.
BlockError::ParentUnknown(block) => {
let slot = block.slot();
let parent_root = block.parent_root();
lookup.add_child_components(block.into());
Action::ParentUnknown { parent_root, slot }
}
Err(e) => {
// Drop with an additional error.
warn!(self.log, "Single block lookup failed"; "block" => %root, "error" => ?e);
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
debug!(
self.log,
"Single block lookup failed. Execution layer is offline / unsynced / misconfigured";
"root" => %root,
"error" => ?e
);
Action::Drop
}
BlockError::AvailabilityCheck(e) => match e.category() {
AvailabilityCheckErrorCategory::Internal => {
warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.block_request_state.state.on_download_failure();
lookup.blob_request_state.state.on_download_failure();
Action::Retry
}
AvailabilityCheckErrorCategory::Malicious => {
warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.handle_availability_check_failure(cx);
Action::Retry
}
},
other => {
warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id);
if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() {
cx.report_peer(
block_peer,
PeerAction::MidToleranceError,
"single_block_failure",
);

lookup.block_request_state.state.on_processing_failure();
}
Action::Retry
}
}
}
};
}

/// Handles a `MissingComponents` block processing error. Handles peer scoring and retries.
///
/// If this was the result of a block request, we can't determined if the block peer did anything
/// wrong. If we already had both a block and blobs response processed, we should penalize the
/// blobs peer because they did not provide all blobs on the initial request.
fn handle_missing_components<R: RequestState<Current, T>>(
&self,
cx: &SyncNetworkContext<T>,
lookup: &mut SingleBlockLookup<Current, T>,
) -> Result<(), LookupRequestError> {
let request_state = R::request_state_mut(lookup);

request_state
.get_state_mut()
.on_processing_success()
.map_err(LookupRequestError::BadState)?;
if lookup.both_components_processed() {
lookup.penalize_blob_peer(cx);

// Try it again if possible.
lookup.blob_request_state.state.on_processing_failure();
lookup.request_block_and_blobs(cx)?;
}
Ok(())
}

/// Handles peer scoring and retries related to a `BlockError` in response to a single block
/// or blob lookup processing result.
fn handle_single_lookup_block_error(
&mut self,
cx: &mut SyncNetworkContext<T>,
mut lookup: SingleBlockLookup<Current, T>,
peer_id: PeerId,
e: BlockError<T::EthSpec>,
) -> Result<Option<SingleBlockLookup<Current, T>>, LookupRequestError> {
let root = lookup.block_root();
trace!(self.log, "Single block processing failed"; "block" => %root, "error" => %e);
match e {
BlockError::BlockIsAlreadyKnown(_) => {
// No error here
return Ok(None);
}
BlockError::BeaconChainError(e) => {
// Internal error
error!(self.log, "Beacon chain error processing single block"; "block_root" => %root, "error" => ?e);
return Ok(None);
match action {
Action::Retry => {
if let Err(e) = lookup.request_block_and_blobs(cx) {
warn!(self.log, "Single block lookup failed"; "block_root" => %block_root, "error" => ?e);
// Failed with too many retries, drop with noop
self.update_metrics();
} else {
self.single_block_lookups.insert(target_id, lookup);
}
}
BlockError::ParentUnknown(block) => {
let slot = block.slot();
let parent_root = block.parent_root();
lookup.add_child_components(block.into());
lookup.request_block_and_blobs(cx)?;
self.search_parent(slot, root, parent_root, peer_id, cx);
Action::ParentUnknown { parent_root, slot } => {
// TODO: Consider including all peers from the lookup, claiming to know this block, not
// just the one that sent this specific block
self.search_parent(slot, block_root, parent_root, peer_id, cx);
self.single_block_lookups.insert(target_id, lookup);
}
ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => {
// These errors indicate that the execution layer is offline
// and failed to validate the execution payload. Do not downscore peer.
debug!(
self.log,
"Single block lookup failed. Execution layer is offline / unsynced / misconfigured";
"root" => %root,
"error" => ?e
);
return Ok(None);
Action::Drop => {
// drop with noop
self.update_metrics();
}
BlockError::AvailabilityCheck(e) => match e.category() {
AvailabilityCheckErrorCategory::Internal => {
warn!(self.log, "Internal availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.block_request_state.state.on_download_failure();
lookup.blob_request_state.state.on_download_failure();
lookup.request_block_and_blobs(cx)?
}
AvailabilityCheckErrorCategory::Malicious => {
warn!(self.log, "Availability check failure"; "root" => %root, "peer_id" => %peer_id, "error" => ?e);
lookup.handle_availability_check_failure(cx);
lookup.request_block_and_blobs(cx)?
}
},
other => {
warn!(self.log, "Peer sent invalid block in single block lookup"; "root" => %root, "error" => ?other, "peer_id" => %peer_id);
if let Ok(block_peer) = lookup.block_request_state.state.processing_peer() {
cx.report_peer(
block_peer,
PeerAction::MidToleranceError,
"single_block_failure",
);

// Try it again if possible.
lookup.block_request_state.state.on_processing_failure();
lookup.request_block_and_blobs(cx)?
}
Action::Continue => {
self.single_block_lookups.insert(target_id, lookup);
}
}
Ok(Some(lookup))
}

pub fn parent_block_processed(
Expand Down Expand Up @@ -1369,4 +1367,16 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn drop_parent_chain_requests(&mut self) -> usize {
self.parent_lookups.drain(..).len()
}

pub fn update_metrics(&self) {
metrics::set_gauge(
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
self.single_block_lookups.len() as i64,
);

metrics::set_gauge(
&metrics::SYNC_PARENT_BLOCK_LOOKUPS,
self.parent_lookups.len() as i64,
);
}
}

0 comments on commit e5b8d12

Please sign in to comment.