diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index d2a0066c840..60126818b60 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -6,7 +6,7 @@ use super::network_context::{RpcProcessingResult, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; -use crate::sync::manager::Id; +use crate::sync::manager::{Id, SingleLookupReqId}; use crate::sync::network_context::LookupFailure; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; @@ -308,19 +308,19 @@ impl BlockLookups { /// Process a block or blob response received from a single lookup request. pub fn on_download_response>( &mut self, - id: SingleLookupId, + id: SingleLookupReqId, peer_id: PeerId, response: RpcProcessingResult, cx: &mut SyncNetworkContext, ) { let result = self.on_download_response_inner::(id, peer_id, response, cx); - self.on_lookup_result(id, result, "download_response", cx); + self.on_lookup_result(id.lookup_id, result, "download_response", cx); } /// Process a block or blob response received from a single lookup request. pub fn on_download_response_inner>( &mut self, - id: SingleLookupId, + id: SingleLookupReqId, peer_id: PeerId, response: RpcProcessingResult, cx: &mut SyncNetworkContext, @@ -333,10 +333,10 @@ impl BlockLookups { } let response_type = R::response_type(); - let Some(lookup) = self.single_block_lookups.get_mut(&id) else { + let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { // We don't have the ability to cancel in-flight RPC requests. So this can happen // if we started this RPC request, and later saw the block/blobs via gossip. - debug!(self.log, "Block returned for single block lookup not present"; "id" => id); + debug!(self.log, "Block returned for single block lookup not present"; "id" => ?id); return Err(LookupRequestError::UnknownLookup); }; @@ -348,7 +348,7 @@ impl BlockLookups { debug!(self.log, "Received lookup download success"; "block_root" => ?block_root, - "id" => id, + "id" => ?id, "peer_id" => %peer_id, "response_type" => ?response_type, ); @@ -356,25 +356,28 @@ impl BlockLookups { // Register the download peer here. Once we have received some data over the wire we // attribute it to this peer for scoring latter regardless of how the request was // done. - request_state.on_download_success(DownloadResult { - value: response, - block_root, - seen_timestamp, - peer_id, - })?; + request_state.on_download_success( + id.req_id, + DownloadResult { + value: response, + block_root, + seen_timestamp, + peer_id, + }, + )?; // continue_request will send for processing as the request state is AwaitingProcessing } Err(e) => { debug!(self.log, "Received lookup download failure"; "block_root" => ?block_root, - "id" => id, + "id" => ?id, "peer_id" => %peer_id, "response_type" => ?response_type, "error" => %e, ); - request_state.on_download_failure()?; + request_state.on_download_failure(id.req_id)?; // continue_request will retry a download as the request state is AwaitingDownload } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index ec3256ce584..6804798dc93 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -2,7 +2,7 @@ use super::common::ResponseType; use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::Id; -use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; +use crate::sync::network_context::{LookupRequestResult, ReqId, SyncNetworkContext}; use beacon_chain::BeaconChainTypes; use itertools::Itertools; use rand::seq::IteratorRandom; @@ -41,6 +41,13 @@ pub enum LookupRequestError { Failed, /// Attempted to retrieve a not known lookup id UnknownLookup, + /// Received a download result for a different request id than the in-flight request. + /// There should only exist a single request at a time. Having multiple requests is a bug and + /// can result in undefined state, so it's treated as a hard error and the lookup is dropped. + UnexpectedRequestId { + expected_req_id: ReqId, + req_id: ReqId, + }, } pub struct SingleBlockLookup { @@ -185,7 +192,9 @@ impl SingleBlockLookup { }; match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? { - LookupRequestResult::RequestSent => request.get_state_mut().on_download_start()?, + LookupRequestResult::RequestSent(req_id) => { + request.get_state_mut().on_download_start(req_id)? + } LookupRequestResult::NoRequestNeeded => { request.get_state_mut().on_completed_request()? } @@ -272,7 +281,7 @@ pub struct DownloadResult { #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum State { AwaitingDownload, - Downloading, + Downloading(ReqId), AwaitingProcess(DownloadResult), /// Request is processing, sent by lookup sync Processing(DownloadResult), @@ -355,10 +364,10 @@ impl SingleLookupRequestState { } /// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None. - pub fn on_download_start(&mut self) -> Result<(), LookupRequestError> { + pub fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { match &self.state { State::AwaitingDownload => { - self.state = State::Downloading; + self.state = State::Downloading(req_id); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -369,9 +378,15 @@ impl SingleLookupRequestState { /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong /// block. - pub fn on_download_failure(&mut self) -> Result<(), LookupRequestError> { + pub fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { match &self.state { - State::Downloading => { + State::Downloading(expected_req_id) => { + if req_id != *expected_req_id { + return Err(LookupRequestError::UnexpectedRequestId { + expected_req_id: *expected_req_id, + req_id, + }); + } self.failed_downloading = self.failed_downloading.saturating_add(1); self.state = State::AwaitingDownload; Ok(()) @@ -384,10 +399,17 @@ impl SingleLookupRequestState { pub fn on_download_success( &mut self, + req_id: ReqId, result: DownloadResult, ) -> Result<(), LookupRequestError> { match &self.state { - State::Downloading => { + State::Downloading(expected_req_id) => { + if req_id != *expected_req_id { + return Err(LookupRequestError::UnexpectedRequestId { + expected_req_id: *expected_req_id, + req_id, + }); + } self.state = State::AwaitingProcess(result); Ok(()) } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 6afaa76da9e..4a4d70090e6 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -819,7 +819,7 @@ impl SyncManager { if let Some(resp) = self.network.on_single_block_response(id, block) { self.block_lookups .on_download_response::>( - id.lookup_id, + id, peer_id, resp, &mut self.network, @@ -861,7 +861,7 @@ impl SyncManager { if let Some(resp) = self.network.on_single_blob_response(id, blob) { self.block_lookups .on_download_response::>( - id.lookup_id, + id, peer_id, resp, &mut self.network, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index cc4d18fd68a..44fb69d9b2f 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -81,10 +81,13 @@ impl From for LookupFailure { } } +/// Sequential ID that uniquely identifies ReqResp outgoing requests +pub type ReqId = u32; + pub enum LookupRequestResult { /// A request is sent. Sync MUST receive an event from the network in the future for either: /// completed response or failed request - RequestSent, + RequestSent(ReqId), /// No request is sent, and no further action is necessary to consider this request completed NoRequestNeeded, /// No request is sent, but the request is not completed. Sync MUST receive some future event @@ -341,10 +344,8 @@ impl SyncNetworkContext { return Ok(LookupRequestResult::Pending); } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), - }; + let req_id = self.next_id(); + let id = SingleLookupReqId { lookup_id, req_id }; debug!( self.log, @@ -366,7 +367,7 @@ impl SyncNetworkContext { self.blocks_by_root_requests .insert(id, ActiveBlocksByRootRequest::new(request)); - Ok(LookupRequestResult::RequestSent) + Ok(LookupRequestResult::RequestSent(req_id)) } /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: @@ -416,10 +417,8 @@ impl SyncNetworkContext { return Ok(LookupRequestResult::NoRequestNeeded); } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), - }; + let req_id = self.next_id(); + let id = SingleLookupReqId { lookup_id, req_id }; debug!( self.log, @@ -445,7 +444,7 @@ impl SyncNetworkContext { self.blobs_by_root_requests .insert(id, ActiveBlobsByRootRequest::new(request)); - Ok(LookupRequestResult::RequestSent) + Ok(LookupRequestResult::RequestSent(req_id)) } pub fn is_execution_engine_online(&self) -> bool {