Skip to content

Commit

Permalink
Enforce sync lookup receives a single result (#5777)
Browse files Browse the repository at this point in the history
* Enforce sync lookup receives a single result
  • Loading branch information
dapplion authored May 14, 2024
1 parent f37ffe4 commit ce66ab3
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 36 deletions.
33 changes: 18 additions & 15 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::network_context::{RpcProcessingResult, SyncNetworkContext};
use crate::metrics;
use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE};
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::manager::Id;
use crate::sync::manager::{Id, SingleLookupReqId};
use crate::sync::network_context::LookupFailure;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory;
Expand Down Expand Up @@ -308,19 +308,19 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Process a block or blob response received from a single lookup request.
pub fn on_download_response<R: RequestState<T>>(
&mut self,
id: SingleLookupId,
id: SingleLookupReqId,
peer_id: PeerId,
response: RpcProcessingResult<R::VerifiedResponseType>,
cx: &mut SyncNetworkContext<T>,
) {
let result = self.on_download_response_inner::<R>(id, peer_id, response, cx);
self.on_lookup_result(id, result, "download_response", cx);
self.on_lookup_result(id.lookup_id, result, "download_response", cx);
}

/// Process a block or blob response received from a single lookup request.
pub fn on_download_response_inner<R: RequestState<T>>(
&mut self,
id: SingleLookupId,
id: SingleLookupReqId,
peer_id: PeerId,
response: RpcProcessingResult<R::VerifiedResponseType>,
cx: &mut SyncNetworkContext<T>,
Expand All @@ -333,10 +333,10 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

let response_type = R::response_type();
let Some(lookup) = self.single_block_lookups.get_mut(&id) else {
let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
// We don't have the ability to cancel in-flight RPC requests. So this can happen
// if we started this RPC request, and later saw the block/blobs via gossip.
debug!(self.log, "Block returned for single block lookup not present"; "id" => id);
debug!(self.log, "Block returned for single block lookup not present"; "id" => ?id);
return Err(LookupRequestError::UnknownLookup);
};

Expand All @@ -348,33 +348,36 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
debug!(self.log,
"Received lookup download success";
"block_root" => ?block_root,
"id" => id,
"id" => ?id,
"peer_id" => %peer_id,
"response_type" => ?response_type,
);

// Register the download peer here. Once we have received some data over the wire we
// attribute it to this peer for scoring latter regardless of how the request was
// done.
request_state.on_download_success(DownloadResult {
value: response,
block_root,
seen_timestamp,
peer_id,
})?;
request_state.on_download_success(
id.req_id,
DownloadResult {
value: response,
block_root,
seen_timestamp,
peer_id,
},
)?;
// continue_request will send for processing as the request state is AwaitingProcessing
}
Err(e) => {
debug!(self.log,
"Received lookup download failure";
"block_root" => ?block_root,
"id" => id,
"id" => ?id,
"peer_id" => %peer_id,
"response_type" => ?response_type,
"error" => %e,
);

request_state.on_download_failure()?;
request_state.on_download_failure(id.req_id)?;
// continue_request will retry a download as the request state is AwaitingDownload
}
}
Expand Down
38 changes: 30 additions & 8 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::common::ResponseType;
use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS};
use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id;
use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext};
use crate::sync::network_context::{LookupRequestResult, ReqId, SyncNetworkContext};
use beacon_chain::BeaconChainTypes;
use itertools::Itertools;
use rand::seq::IteratorRandom;
Expand Down Expand Up @@ -41,6 +41,13 @@ pub enum LookupRequestError {
Failed,
/// Attempted to retrieve a not known lookup id
UnknownLookup,
/// Received a download result for a different request id than the in-flight request.
/// There should only exist a single request at a time. Having multiple requests is a bug and
/// can result in undefined state, so it's treated as a hard error and the lookup is dropped.
UnexpectedRequestId {
expected_req_id: ReqId,
req_id: ReqId,
},
}

pub struct SingleBlockLookup<T: BeaconChainTypes> {
Expand Down Expand Up @@ -185,7 +192,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
};

match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
LookupRequestResult::RequestSent => request.get_state_mut().on_download_start()?,
LookupRequestResult::RequestSent(req_id) => {
request.get_state_mut().on_download_start(req_id)?
}
LookupRequestResult::NoRequestNeeded => {
request.get_state_mut().on_completed_request()?
}
Expand Down Expand Up @@ -272,7 +281,7 @@ pub struct DownloadResult<T: Clone> {
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
pub enum State<T: Clone> {
AwaitingDownload,
Downloading,
Downloading(ReqId),
AwaitingProcess(DownloadResult<T>),
/// Request is processing, sent by lookup sync
Processing(DownloadResult<T>),
Expand Down Expand Up @@ -355,10 +364,10 @@ impl<T: Clone> SingleLookupRequestState<T> {
}

/// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None.
pub fn on_download_start(&mut self) -> Result<(), LookupRequestError> {
pub fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> {
match &self.state {
State::AwaitingDownload => {
self.state = State::Downloading;
self.state = State::Downloading(req_id);
Ok(())
}
other => Err(LookupRequestError::BadState(format!(
Expand All @@ -369,9 +378,15 @@ impl<T: Clone> SingleLookupRequestState<T> {

/// Registers a failure in downloading a block. This might be a peer disconnection or a wrong
/// block.
pub fn on_download_failure(&mut self) -> Result<(), LookupRequestError> {
pub fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> {
match &self.state {
State::Downloading => {
State::Downloading(expected_req_id) => {
if req_id != *expected_req_id {
return Err(LookupRequestError::UnexpectedRequestId {
expected_req_id: *expected_req_id,
req_id,
});
}
self.failed_downloading = self.failed_downloading.saturating_add(1);
self.state = State::AwaitingDownload;
Ok(())
Expand All @@ -384,10 +399,17 @@ impl<T: Clone> SingleLookupRequestState<T> {

pub fn on_download_success(
&mut self,
req_id: ReqId,
result: DownloadResult<T>,
) -> Result<(), LookupRequestError> {
match &self.state {
State::Downloading => {
State::Downloading(expected_req_id) => {
if req_id != *expected_req_id {
return Err(LookupRequestError::UnexpectedRequestId {
expected_req_id: *expected_req_id,
req_id,
});
}
self.state = State::AwaitingProcess(result);
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if let Some(resp) = self.network.on_single_block_response(id, block) {
self.block_lookups
.on_download_response::<BlockRequestState<T::EthSpec>>(
id.lookup_id,
id,
peer_id,
resp,
&mut self.network,
Expand Down Expand Up @@ -861,7 +861,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
if let Some(resp) = self.network.on_single_blob_response(id, blob) {
self.block_lookups
.on_download_response::<BlobRequestState<T::EthSpec>>(
id.lookup_id,
id,
peer_id,
resp,
&mut self.network,
Expand Down
21 changes: 10 additions & 11 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,13 @@ impl From<LookupVerifyError> for LookupFailure {
}
}

/// Sequential ID that uniquely identifies ReqResp outgoing requests
pub type ReqId = u32;

pub enum LookupRequestResult {
/// A request is sent. Sync MUST receive an event from the network in the future for either:
/// completed response or failed request
RequestSent,
RequestSent(ReqId),
/// No request is sent, and no further action is necessary to consider this request completed
NoRequestNeeded,
/// No request is sent, but the request is not completed. Sync MUST receive some future event
Expand Down Expand Up @@ -341,10 +344,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
return Ok(LookupRequestResult::Pending);
}

let id = SingleLookupReqId {
lookup_id,
req_id: self.next_id(),
};
let req_id = self.next_id();
let id = SingleLookupReqId { lookup_id, req_id };

debug!(
self.log,
Expand All @@ -366,7 +367,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.blocks_by_root_requests
.insert(id, ActiveBlocksByRootRequest::new(request));

Ok(LookupRequestResult::RequestSent)
Ok(LookupRequestResult::RequestSent(req_id))
}

/// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking:
Expand Down Expand Up @@ -416,10 +417,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
return Ok(LookupRequestResult::NoRequestNeeded);
}

let id = SingleLookupReqId {
lookup_id,
req_id: self.next_id(),
};
let req_id = self.next_id();
let id = SingleLookupReqId { lookup_id, req_id };

debug!(
self.log,
Expand All @@ -445,7 +444,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.blobs_by_root_requests
.insert(id, ActiveBlobsByRootRequest::new(request));

Ok(LookupRequestResult::RequestSent)
Ok(LookupRequestResult::RequestSent(req_id))
}

pub fn is_execution_engine_online(&self) -> bool {
Expand Down

0 comments on commit ce66ab3

Please sign in to comment.