Skip to content

Commit

Permalink
Prevent sync lookups from reverting to awaiting block (#6443)
Browse files Browse the repository at this point in the history
* Prevent sync lookups from reverting to awaiting block

* Remove stale comment
  • Loading branch information
dapplion authored Oct 10, 2024
1 parent da290e8 commit a0a62ea
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 102 deletions.
38 changes: 24 additions & 14 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{DataColumnSidecarList, SignedBeaconBlock};

use super::single_block_lookup::DownloadResult;
use super::single_block_lookup::{ComponentRequests, DownloadResult};
use super::SingleLookupId;

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -42,7 +42,7 @@ pub trait RequestState<T: BeaconChainTypes> {
&self,
id: Id,
peer_id: PeerId,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
expected_blobs: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError>;

Expand All @@ -61,7 +61,7 @@ pub trait RequestState<T: BeaconChainTypes> {
fn response_type() -> ResponseType;

/// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait.
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self;
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str>;

/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType>;
Expand All @@ -77,7 +77,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
&self,
id: SingleLookupId,
peer_id: PeerId,
_: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
_: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
Expand Down Expand Up @@ -107,8 +107,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
fn response_type() -> ResponseType {
ResponseType::Block
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.block_request_state
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
Ok(&mut request.block_request_state)
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
Expand All @@ -125,10 +125,10 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
&self,
id: Id,
peer_id: PeerId,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
expected_blobs: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.blob_lookup_request(id, peer_id, self.block_root, downloaded_block)
cx.blob_lookup_request(id, peer_id, self.block_root, expected_blobs)
.map_err(LookupRequestError::SendFailedNetwork)
}

Expand All @@ -150,8 +150,13 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
fn response_type() -> ResponseType {
ResponseType::Blob
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.blob_request_state
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
match &mut request.component_requests {
ComponentRequests::WaitingForBlock => Err("waiting for block"),
ComponentRequests::ActiveBlobRequest(request, _) => Ok(request),
ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"),
ComponentRequests::NotNeeded { .. } => Err("not needed"),
}
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
Expand All @@ -169,10 +174,10 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
id: Id,
// TODO(das): consider selecting peers that have custody but are in this set
_peer_id: PeerId,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
_: usize,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupRequestResult, LookupRequestError> {
cx.custody_lookup_request(id, self.block_root, downloaded_block)
cx.custody_lookup_request(id, self.block_root)
.map_err(LookupRequestError::SendFailedNetwork)
}

Expand Down Expand Up @@ -200,8 +205,13 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
fn response_type() -> ResponseType {
ResponseType::CustodyColumn
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.custody_request_state
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> Result<&mut Self, &'static str> {
match &mut request.component_requests {
ComponentRequests::WaitingForBlock => Err("waiting for block"),
ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"),
ComponentRequests::ActiveCustodyRequest(request) => Ok(request),
ComponentRequests::NotNeeded { .. } => Err("not needed"),
}
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};

let block_root = lookup.block_root();
let request_state = R::request_state_mut(lookup).get_state_mut();
let request_state = R::request_state_mut(lookup)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
.get_state_mut();

match response {
Ok((response, peer_group, seen_timestamp)) => {
Expand Down Expand Up @@ -545,7 +547,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
};

let block_root = lookup.block_root();
let request_state = R::request_state_mut(lookup).get_state_mut();
let request_state = R::request_state_mut(lookup)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
.get_state_mut();

debug!(
self.log,
Expand Down
117 changes: 97 additions & 20 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::sync::network_context::{
LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, SendErrorProcessor,
SyncNetworkContext,
};
use beacon_chain::BeaconChainTypes;
use beacon_chain::{BeaconChainTypes, BlockProcessStatus};
use derivative::Derivative;
use lighthouse_network::service::api_types::Id;
use rand::seq::IteratorRandom;
Expand Down Expand Up @@ -62,8 +62,7 @@ pub enum LookupRequestError {
pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub blob_request_state: BlobRequestState<T::EthSpec>,
pub custody_request_state: CustodyRequestState<T::EthSpec>,
pub component_requests: ComponentRequests<T::EthSpec>,
/// Peers that claim to have imported this set of block components
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
peers: HashSet<PeerId>,
Expand All @@ -72,6 +71,16 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
created: Instant,
}

#[derive(Debug)]
pub(crate) enum ComponentRequests<E: EthSpec> {
WaitingForBlock,
ActiveBlobRequest(BlobRequestState<E>, usize),
ActiveCustodyRequest(CustodyRequestState<E>),
// When printing in debug this state display the reason why it's not needed
#[allow(dead_code)]
NotNeeded(&'static str),
}

impl<T: BeaconChainTypes> SingleBlockLookup<T> {
pub fn new(
requested_block_root: Hash256,
Expand All @@ -82,8 +91,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
Self {
id,
block_request_state: BlockRequestState::new(requested_block_root),
blob_request_state: BlobRequestState::new(requested_block_root),
custody_request_state: CustodyRequestState::new(requested_block_root),
component_requests: ComponentRequests::WaitingForBlock,
peers: HashSet::from_iter(peers.iter().copied()),
block_root: requested_block_root,
awaiting_parent,
Expand Down Expand Up @@ -150,16 +158,28 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Returns true if the block has already been downloaded.
pub fn all_components_processed(&self) -> bool {
self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
&& self.custody_request_state.state.is_processed()
&& match &self.component_requests {
ComponentRequests::WaitingForBlock => false,
ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(),
ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(),
ComponentRequests::NotNeeded { .. } => true,
}
}

/// Returns true if this request is expecting some event to make progress
pub fn is_awaiting_event(&self) -> bool {
self.awaiting_parent.is_some()
|| self.block_request_state.state.is_awaiting_event()
|| self.blob_request_state.state.is_awaiting_event()
|| self.custody_request_state.state.is_awaiting_event()
|| match &self.component_requests {
ComponentRequests::WaitingForBlock => true,
ComponentRequests::ActiveBlobRequest(request, _) => {
request.state.is_awaiting_event()
}
ComponentRequests::ActiveCustodyRequest(request) => {
request.state.is_awaiting_event()
}
ComponentRequests::NotNeeded { .. } => false,
}
}

/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
Expand All @@ -169,9 +189,66 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
// TODO: Check what's necessary to download, specially for blobs
self.continue_request::<BlockRequestState<T::EthSpec>>(cx)?;
self.continue_request::<BlobRequestState<T::EthSpec>>(cx)?;
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx)?;
self.continue_request::<BlockRequestState<T::EthSpec>>(cx, 0)?;

if let ComponentRequests::WaitingForBlock = self.component_requests {
let downloaded_block = self
.block_request_state
.state
.peek_downloaded_data()
.cloned();

if let Some(block) = downloaded_block.or_else(|| {
// If the block is already being processed or fully validated, retrieve how many blobs
// it expects. Consider any stage of the block. If the block root has been validated, we
// can assert that this is the correct value of `blob_kzg_commitments_count`.
match cx.chain.get_block_process_status(&self.block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
}
}) {
let expected_blobs = block.num_expected_blobs();
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
if expected_blobs == 0 {
self.component_requests = ComponentRequests::NotNeeded("no data");
}
if cx.chain.should_fetch_blobs(block_epoch) {
self.component_requests = ComponentRequests::ActiveBlobRequest(
BlobRequestState::new(self.block_root),
expected_blobs,
);
} else if cx.chain.should_fetch_custody_columns(block_epoch) {
self.component_requests = ComponentRequests::ActiveCustodyRequest(
CustodyRequestState::new(self.block_root),
);
} else {
self.component_requests = ComponentRequests::NotNeeded("outside da window");
}
} else {
// Wait to download the block before downloading blobs. Then we can be sure that the
// block has data, so there's no need to do "blind" requests for all possible blobs and
// latter handle the case where if the peer sent no blobs, penalize.
//
// Lookup sync event safety: Reaching this code means that a block is not in any pre-import
// cache nor in the request state of this lookup. Therefore, the block must either: (1) not
// be downloaded yet or (2) the block is already imported into the fork-choice.
// In case (1) the lookup must either successfully download the block or get dropped.
// In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported`
// and get dropped as completed.
}
}

match &self.component_requests {
ComponentRequests::WaitingForBlock => {} // do nothing
ComponentRequests::ActiveBlobRequest(_, expected_blobs) => {
self.continue_request::<BlobRequestState<T::EthSpec>>(cx, *expected_blobs)?
}
ComponentRequests::ActiveCustodyRequest(_) => {
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx, 0)?
}
ComponentRequests::NotNeeded { .. } => {} // do nothing
}

// If all components of this lookup are already processed, there will be no future events
// that can make progress so it must be dropped. Consider the lookup completed.
Expand All @@ -187,15 +264,12 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
fn continue_request<R: RequestState<T>>(
&mut self,
cx: &mut SyncNetworkContext<T>,
expected_blobs: usize,
) -> Result<(), LookupRequestError> {
let id = self.id;
let awaiting_parent = self.awaiting_parent.is_some();
let downloaded_block = self
.block_request_state
.state
.peek_downloaded_data()
.cloned();
let request = R::request_state_mut(self);
let request =
R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?;

// Attempt to progress awaiting downloads
if request.get_state().is_awaiting_download() {
Expand All @@ -214,13 +288,16 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// not receive any new peers for some time it will be dropped. If it receives a new
// peer it must attempt to make progress.
R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?
.get_state_mut()
.update_awaiting_download_status("no peers");
return Ok(());
};

let request = R::request_state_mut(self);
match request.make_request(id, peer_id, downloaded_block, cx)? {
let request = R::request_state_mut(self)
.map_err(|e| LookupRequestError::BadState(e.to_owned()))?;

match request.make_request(id, peer_id, expected_blobs, cx)? {
LookupRequestResult::RequestSent(req_id) => {
// Lookup sync event safety: If make_request returns `RequestSent`, we are
// guaranteed that `BlockLookups::on_download_response` will be called exactly
Expand Down
67 changes: 1 addition & 66 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,45 +632,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
lookup_id: SingleLookupId,
peer_id: PeerId,
block_root: Hash256,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
expected_blobs: usize,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(block) = downloaded_block.or_else(|| {
// If the block is already being processed or fully validated, retrieve how many blobs
// it expects. Consider any stage of the block. If the block root has been validated, we
// can assert that this is the correct value of `blob_kzg_commitments_count`.
match self.chain.get_block_process_status(&block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
}
}) else {
// Wait to download the block before downloading blobs. Then we can be sure that the
// block has data, so there's no need to do "blind" requests for all possible blobs and
// latter handle the case where if the peer sent no blobs, penalize.
// - if `downloaded_block_expected_blobs` is Some = block is downloading or processing.
// - if `num_expected_blobs` returns Some = block is processed.
//
// Lookup sync event safety: Reaching this code means that a block is not in any pre-import
// cache nor in the request state of this lookup. Therefore, the block must either: (1) not
// be downloaded yet or (2) the block is already imported into the fork-choice.
// In case (1) the lookup must either successfully download the block or get dropped.
// In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported`
// and get dropped as completed.
return Ok(LookupRequestResult::Pending("waiting for block download"));
};
let expected_blobs = block.num_expected_blobs();
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());

// Check if we are in deneb, before peerdas and inside da window
if !self.chain.should_fetch_blobs(block_epoch) {
return Ok(LookupRequestResult::NoRequestNeeded("blobs not required"));
}

// No data required for this block
if expected_blobs == 0 {
return Ok(LookupRequestResult::NoRequestNeeded("no data"));
}

let imported_blob_indexes = self
.chain
.data_availability_checker
Expand Down Expand Up @@ -760,35 +723,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&mut self,
lookup_id: SingleLookupId,
block_root: Hash256,
downloaded_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Result<LookupRequestResult, RpcRequestSendError> {
let Some(block) =
downloaded_block.or_else(|| match self.chain.get_block_process_status(&block_root) {
BlockProcessStatus::Unknown => None,
BlockProcessStatus::NotValidated(block)
| BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()),
})
else {
// Wait to download the block before downloading columns. Then we can be sure that the
// block has data, so there's no need to do "blind" requests for all possible columns and
// latter handle the case where if the peer sent no columns, penalize.
// - if `downloaded_block_expected_blobs` is Some = block is downloading or processing.
// - if `num_expected_blobs` returns Some = block is processed.
return Ok(LookupRequestResult::Pending("waiting for block download"));
};
let expected_blobs = block.num_expected_blobs();
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());

// Check if we are into peerdas and inside da window
if !self.chain.should_fetch_custody_columns(block_epoch) {
return Ok(LookupRequestResult::NoRequestNeeded("columns not required"));
}

// No data required for this block
if expected_blobs == 0 {
return Ok(LookupRequestResult::NoRequestNeeded("no data"));
}

let custody_indexes_imported = self
.chain
.data_availability_checker
Expand Down

0 comments on commit a0a62ea

Please sign in to comment.