diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 6c8afaa01a6..a370c68e56b 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -133,6 +133,21 @@ impl DataAvailabilityChecker { }) } + /// Return the set of imported custody column indexes for `block_root`. Returns None if there is + /// no block component for `block_root`. + pub fn imported_custody_column_indexes(&self, block_root: &Hash256) -> Option> { + self.availability_cache + .peek_pending_components(block_root, |components| { + components.map(|components| { + components + .get_cached_data_columns() + .iter() + .map(|item| item.data_column_index()) + .collect::>() + }) + }) + } + /// Get a blob from the availability cache. pub fn get_blob( &self, diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 18bf4e7dcfc..b030d8eff4e 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -388,6 +388,10 @@ impl DataColumnsByRootRequest { Self { data_column_ids } } + pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self { + Self::new(vec![DataColumnIdentifier { block_root, index }], spec) + } + pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec)> { let mut column_indexes_by_block = BTreeMap::>::new(); for request_id in self.data_column_ids.as_slice() { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index ff2cb97d057..5dd74f25726 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -115,6 +115,7 @@ impl NetworkGlobals { pub fn custody_columns(&self, _epoch: Epoch) -> Result, &'static str> { let enr = self.local_enr(); let node_id = enr.node_id().raw().into(); + // TODO(das): cache this number at start-up to not make this fallible let custody_subnet_count = enr.custody_subnet_count::()?; Ok( DataColumnSubnetId::compute_custody_columns::(node_id, custody_subnet_count) diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index 442822ff301..cba3c56d8a4 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -12,7 +12,7 @@ use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::BeaconChainTypes; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; -use types::SignedBeaconBlock; +use types::{DataColumnSidecar, SignedBeaconBlock}; use super::single_block_lookup::DownloadResult; use super::SingleLookupId; @@ -23,6 +23,7 @@ use super::single_block_lookup::CustodyRequestState; pub enum ResponseType { Block, Blob, + CustodyColumn, } /// The maximum depth we will search for a parent block. In principle we should have sync'd any @@ -234,75 +235,51 @@ impl RequestState for BlobRequestState { } } -impl RequestState for CustodyRequestState { - type RequestType = BlobsByRootSingleBlockRequest; - type VerifiedResponseType = FixedBlobSidecarList; - type ReconstructedResponseType = FixedBlobSidecarList; - - fn new_request(&self) -> Self::RequestType { - // Removed in https://github.com/sigp/lighthouse/pull/5655 - todo!() - } +impl RequestState for CustodyRequestState { + type RequestType = (); + type VerifiedResponseType = Vec>>; fn make_request( - id: SingleLookupReqId, + &self, + id: Id, peer_id: PeerId, - block_root: Hash256, + downloaded_block_expected_blobs: Option, cx: &mut SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - cx.custody_lookup_request(id, block_root) + ) -> Result { + cx.custody_lookup_request(id, self.block_root, downloaded_block_expected_blobs) .map_err(LookupRequestError::SendFailed) } - fn get_parent_root(verified_response: &FixedBlobSidecarList) -> Option { - verified_response - .into_iter() - .filter_map(|blob| blob.as_ref()) - .map(|blob| blob.block_parent_root()) - .next() - } - - fn add_to_child_components( - verified_response: FixedBlobSidecarList, - components: &mut ChildComponents, - ) { - components.merge_blobs(verified_response); - } - - fn verified_to_reconstructed( - _block_root: Hash256, - blobs: FixedBlobSidecarList, - ) -> FixedBlobSidecarList { - blobs - } - - fn send_reconstructed_for_processing( + fn send_for_processing( id: Id, - bl: &BlockLookups, - block_root: Hash256, - verified: FixedBlobSidecarList, - duration: Duration, + download_result: DownloadResult, cx: &SyncNetworkContext, ) -> Result<(), LookupRequestError> { - bl.send_blobs_for_processing( + let DownloadResult { + value, block_root, - verified, - duration, - BlockProcessType::SingleBlob { id }, - cx, + seen_timestamp, + peer_id: _, + } = download_result; + cx.send_custody_columns_for_processing( + block_root, + value, + seen_timestamp, + BlockProcessType::SingleCustodyColumn(id), ) + .map_err(LookupRequestError::SendFailed) } fn response_type() -> ResponseType { - ResponseType::Blob + ResponseType::CustodyColumn } - fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { - &mut request.blob_request_state + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { + &mut request.custody_request_state } - fn get_state(&self) -> &SingleLookupRequestState { + fn get_state(&self) -> &SingleLookupRequestState { &self.state } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { &mut self.state } } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a0c7c33bb0f..11951cd4619 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -15,7 +15,7 @@ pub use common::RequestState; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState}; +pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; use slog::{debug, error, trace, warn, Logger}; use std::sync::Arc; use std::time::Duration; @@ -395,6 +395,9 @@ impl BlockLookups { BlockProcessType::SingleBlob { id } => { self.on_processing_result_inner::>(id, result, cx) } + BlockProcessType::SingleCustodyColumn(id) => { + self.on_processing_result_inner::>(id, result, cx) + } } { self.on_lookup_request_error(process_type.id(), e, "processing_result"); } @@ -514,6 +517,9 @@ impl BlockLookups { match R::response_type() { ResponseType::Block => "lookup_block_processing_failure", ResponseType::Blob => "lookup_blobs_processing_failure", + ResponseType::CustodyColumn => { + "lookup_custody_column_processing_failure" + } }, ); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 563a09e1852..c6cd3e09036 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -13,7 +13,7 @@ use std::time::Duration; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; -use types::{EthSpec, SignedBeaconBlock}; +use types::{DataColumnSidecar, EthSpec, SignedBeaconBlock}; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupRequestError { @@ -31,7 +31,7 @@ pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, pub blob_request_state: BlobRequestState, - pub custody_request_state: CustodyRequestState, + pub custody_request_state: CustodyRequestState, block_root: Hash256, awaiting_parent: Option, } @@ -198,12 +198,12 @@ impl BlobRequestState { } /// The state of the blob request component of a `SingleBlockLookup`. -pub struct CustodyRequestState { +pub struct CustodyRequestState { pub block_root: Hash256, - pub state: SingleLookupRequestState, + pub state: SingleLookupRequestState>>>, } -impl CustodyRequestState { +impl CustodyRequestState { pub fn new(block_root: Hash256, peer_source: &[PeerId]) -> Self { Self { block_root, diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index bf92ab78796..edd2e303103 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1,7 +1,10 @@ use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::RequestId; -use crate::sync::manager::{RequestId as SyncRequestId, SingleLookupReqId, SyncManager}; +use crate::sync::manager::{ + DataColumnsByRootRequestId, DataColumnsByRootRequester, RequestId as SyncRequestId, + SingleLookupReqId, SyncManager, +}; use crate::sync::sampling::{SamplingConfig, SamplingRequester}; use crate::sync::{SamplingId, SyncMessage}; use crate::NetworkMessage; @@ -82,7 +85,7 @@ const D: Duration = Duration::new(0, 0); const PARENT_FAIL_TOLERANCE: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; const SAMPLING_REQUIRED_SUCCESSES: usize = 2; -type SamplingIds = Vec<(Id, ColumnIndex)>; +type SamplingIds = Vec<(DataColumnsByRootRequestId, ColumnIndex)>; impl TestRig { fn test_setup() -> Self { @@ -495,7 +498,7 @@ impl TestRig { } } - fn return_empty_sampling_request(&mut self, id: Id) { + fn return_empty_sampling_request(&mut self, id: DataColumnsByRootRequestId) { let peer_id = PeerId::random(); // Send stream termination self.send_sync_message(SyncMessage::RpcDataColumn { @@ -522,7 +525,7 @@ impl TestRig { fn complete_valid_sampling_column_request( &mut self, - id: Id, + id: DataColumnsByRootRequestId, data_column: DataColumnSidecar, ) { let peer_id = PeerId::random(); @@ -752,6 +755,7 @@ impl TestRig { } other => panic!("Expected blob process, found {:?}", other), }, + ResponseType::CustodyColumn => todo!(), } } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index d159733cbc7..c666c876c77 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,7 +1,7 @@ use beacon_chain::block_verification_types::RpcBlock; use ssz_types::VariableList; use std::{collections::VecDeque, sync::Arc}; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock}; use super::range_sync::ByRangeRequestType; @@ -11,10 +11,12 @@ pub struct BlocksAndBlobsRequestInfo { accumulated_blocks: VecDeque>>, /// Sidecars we have received awaiting for their corresponding block. accumulated_sidecars: VecDeque>>, + accumulated_custody_columns: VecDeque>>, /// Whether the individual RPC request for blocks is finished or not. is_blocks_stream_terminated: bool, /// Whether the individual RPC request for sidecars is finished or not. is_sidecars_stream_terminated: bool, + is_custody_columns_stream_terminated: bool, /// Used to determine if this accumulator should wait for a sidecars stream termination request_type: ByRangeRequestType, } @@ -24,8 +26,10 @@ impl BlocksAndBlobsRequestInfo { Self { accumulated_blocks: <_>::default(), accumulated_sidecars: <_>::default(), + accumulated_custody_columns: <_>::default(), is_blocks_stream_terminated: <_>::default(), is_sidecars_stream_terminated: <_>::default(), + is_custody_columns_stream_terminated: <_>::default(), request_type, } } @@ -48,6 +52,13 @@ impl BlocksAndBlobsRequestInfo { } } + pub fn add_custody_column(&mut self, column_opt: Option>>) { + match column_opt { + Some(column) => self.accumulated_custody_columns.push_back(column), + None => self.is_custody_columns_stream_terminated = true, + } + } + pub fn into_responses(self) -> Result>, String> { let BlocksAndBlobsRequestInfo { accumulated_blocks, @@ -96,11 +107,12 @@ impl BlocksAndBlobsRequestInfo { } pub fn is_finished(&self) -> bool { - let blobs_requested = match self.request_type { - ByRangeRequestType::Blocks => false, - ByRangeRequestType::BlocksAndBlobs => true, - }; - self.is_blocks_stream_terminated && (!blobs_requested || self.is_sidecars_stream_terminated) + let blobs_requested = matches!(self.request_type, ByRangeRequestType::BlocksAndBlobs); + let custody_columns_requested = + matches!(self.request_type, ByRangeRequestType::BlocksAndColumns); + self.is_blocks_stream_terminated + && (!blobs_requested || self.is_sidecars_stream_terminated) + && (!custody_columns_requested || self.is_custody_columns_stream_terminated) } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fad3c62a085..0c9d1fa061a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,7 +35,11 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext}; +use super::network_context::LookupFailure; +use super::network_context::{ + custody::{Custody, CustodyRequester}, + BlockOrBlob, CustodyId, RangeRequestId, RpcEvent, SyncNetworkContext, +}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::sampling::{Sampling, SamplingConfig, SamplingId, SamplingRequester, SamplingResult}; @@ -43,7 +47,7 @@ use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProces use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ - BlobRequestState, BlockComponent, BlockRequestState, DownloadResult, + BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, }; use crate::sync::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use beacon_chain::block_verification_types::AsBlock; @@ -89,11 +93,17 @@ pub enum RequestId { /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. - DataColumnsByRoot(Id), + DataColumnsByRoot(DataColumnsByRootRequestId), /// Range request that is composed by both a block range request and a blob range request. RangeBlockAndBlobs { id: Id }, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct DataColumnsByRootRequestId { + pub requester: DataColumnsByRootRequester, + pub req_id: Id, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Sampling(SamplingId), @@ -178,12 +188,15 @@ pub enum SyncMessage { pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, + SingleCustodyColumn(Id), } impl BlockProcessType { pub fn id(&self) -> Id { match self { - BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => *id, + BlockProcessType::SingleBlock { id } + | BlockProcessType::SingleBlob { id } + | BlockProcessType::SingleCustodyColumn(id) => *id, } } } @@ -293,6 +306,7 @@ impl SyncManager { backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()), block_lookups: BlockLookups::new(log.clone()), sampling: Sampling::new(sampling_config, log.clone()), + custody: Custody::new(log.clone()), log: log.clone(), } } @@ -954,7 +968,7 @@ impl SyncManager { fn on_single_data_column_response( &mut self, - id: Id, + id: DataColumnsByRootRequestId, peer_id: PeerId, data_column: RpcEvent>>, ) { @@ -972,24 +986,69 @@ impl SyncManager { } } DataColumnsByRootRequester::Custody(id) => { - if let Some((requester, result)) = + if let Some((requester, columns)) = self.custody .on_data_column_downloaded(id, peer_id, resp, &mut self.network) { + // TODO(das): get proper timestamp + let seen_timestamp = timestamp_now(); match requester { CustodyRequester::Lookup(id) => self .block_lookups - .on_download_response::( - id, + .on_download_response::>( + id.lookup_id, peer_id, - result, + columns + .map(|columns| (columns, seen_timestamp)) + .map_err(|_| LookupFailure::LookupVerifyError(todo!())), &mut self.network, ), - CustodyRequester::RangeSync(id) => { - self.range_block_and_blobs_response(id, peer_id, result) - } + CustodyRequester::RangeSync(id) => match columns { + Ok(columns) => { + // TODO: Kind of pointless to unravel the columns Vec here. + // Fixed with https://github.com/sigp/lighthouse/pull/5710 + for column in columns { + self.range_block_and_blobs_response( + id, + peer_id, + BlockOrBlob::CustodyColumns(Some(column)), + ); + } + self.range_block_and_blobs_response( + id, + peer_id, + BlockOrBlob::CustodyColumns(None), + ); + } + Err(err) => { + if let Some(sender_id) = self.network.range_request_failed(id) { + match sender_id { + RangeRequestId::RangeSync { chain_id, batch_id } => { + self.range_sync.inject_error( + &mut self.network, + peer_id, + batch_id, + chain_id, + id, + ); + self.update_sync_state(); + } + RangeRequestId::BackfillSync { batch_id } => { + match self.backfill_sync.inject_error( + &mut self.network, + batch_id, + &peer_id, + id, + ) { + Ok(_) => {} + Err(_) => self.update_sync_state(), + } + } + } + } + } + }, } - self.on_custody_result(requester, result) } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 0ca9063b991..5894e9be467 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,6 +1,8 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. +pub use self::custody::CustodyId; +use self::custody::{ActiveCustodyRequest, CustodyRequester}; use self::requests::{ ActiveBlobsByRootRequest, ActiveBlocksByRootRequest, ActiveDataColumnsByRootRequest, }; @@ -9,7 +11,8 @@ pub use self::requests::{ }; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::manager::{ - BlockProcessType, DataColumnsByRootRequester, Id, RequestId as SyncRequestId, + BlockProcessType, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, + RequestId as SyncRequestId, }; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; @@ -38,7 +41,7 @@ use types::{ SignedBeaconBlock, }; -mod custody; +pub mod custody; mod requests; pub struct BlocksAndBlobsByRangeResponse { @@ -106,10 +109,12 @@ pub struct SyncNetworkContext { /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. blobs_by_root_requests: FnvHashMap>, - data_columns_by_root_requests: - FnvHashMap>, - - custody_by_root_requests: FnvHashMap>, + data_columns_by_root_requests: FnvHashMap< + DataColumnsByRootRequestId, + ActiveDataColumnsByRootRequest, + >, + /// Mapping of active custody column requests for a block root + custody_by_root_requests: FnvHashMap>, /// BlocksByRange requests paired with BlobsByRange range_blocks_and_blobs_requests: @@ -132,6 +137,7 @@ pub struct SyncNetworkContext { pub enum BlockOrBlob { Block(Option>>), Blob(Option>>), + CustodyColumns(Option>>), } impl From>>> for BlockOrBlob { @@ -146,6 +152,12 @@ impl From>>> for BlockOrBlob { } } +impl From>>> for BlockOrBlob { + fn from(custody_column: Option>>) -> Self { + BlockOrBlob::CustodyColumns(custody_column) + } +} + impl SyncNetworkContext { pub fn new( network_send: mpsc::UnboundedSender>, @@ -160,6 +172,7 @@ impl SyncNetworkContext { blocks_by_root_requests: <_>::default(), blobs_by_root_requests: <_>::default(), data_columns_by_root_requests: <_>::default(), + custody_by_root_requests: <_>::default(), range_blocks_and_blobs_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -324,6 +337,7 @@ impl SyncNetworkContext { match block_or_blob { BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::CustodyColumns(column) => info.add_custody_column(column), } if info.is_finished() { // If the request is finished, dequeue everything @@ -459,8 +473,7 @@ impl SyncNetworkContext { peer_id: PeerId, request: DataColumnsByRootSingleBlockRequest, ) -> Result<(), &'static str> { - let id = self.next_id(); - + let req_id = self.next_id(); debug!( self.log, "Sending DataColumnsByRoot Request"; @@ -469,8 +482,9 @@ impl SyncNetworkContext { "indices" => ?request.indices, "peer" => %peer_id, "requester" => ?requester, - "id" => id, + "id" => req_id, ); + let id = DataColumnsByRootRequestId { requester, req_id }; self.send_network_msg(NetworkMessage::SendRequest { peer_id, @@ -486,17 +500,75 @@ impl SyncNetworkContext { pub fn custody_lookup_request( &mut self, - id: SingleLookupReqId, + lookup_id: SingleLookupId, block_root: Hash256, - ) -> Result<(), &'static str> { - let request = ActiveBlobsByRootRequest::new(block_root); - request.send()?; - self.custody_by_root_requests.insert(request); - Ok(()) - } + downloaded_block_expected_data: Option, + ) -> Result { + let expected_data = downloaded_block_expected_data + .or_else(|| { + self.chain + .data_availability_checker + .num_expected_blobs(&block_root) + }) + .map(|n| n > 0) + .unwrap_or_else(|| { + // If we don't about the block being requested, assume block has data + self.chain + .data_availability_checker + .da_check_required_for_current_epoch() + }); - pub fn custody_request(&mut self, block_root: Hash256) -> Result<(), &'static str> { - // + // No data required for this block + if !expected_data { + return Ok(false); + } + + let custody_indexes_imported = self + .chain + .data_availability_checker + .imported_custody_column_indexes(&block_root) + .unwrap_or_default(); + + // TODO(das): figure out how to pass block.slot if we end up doing rotation + let block_epoch = Epoch::new(0); + let custody_indexes_duty = self.network_globals().custody_columns(block_epoch)?; + + // Include only the blob indexes not yet imported (received through gossip) + let custody_indexes_to_fetch = custody_indexes_duty + .into_iter() + .filter(|index| !custody_indexes_imported.contains(index)) + .collect::>(); + + if custody_indexes_to_fetch.is_empty() { + // No indexes required, do not issue any request + return Ok(false); + } + + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; + + debug!( + self.log, + "Starting custody columns request"; + "block_root" => ?block_root, + "indices" => ?custody_indexes_to_fetch, + "id" => ?id + ); + + let request = ActiveCustodyRequest::new( + block_root, + CustodyRequester::Lookup(id), + custody_indexes_to_fetch, + self.log.clone(), + ); + + // TODO(das): start request + // Note that you can only send, but not handle a response here + + self.custody_by_root_requests.insert(id, request); + Ok(true) } pub fn is_execution_engine_online(&self) -> bool { @@ -700,7 +772,7 @@ impl SyncNetworkContext { #[allow(clippy::type_complexity)] pub fn on_data_columns_by_root_response( &mut self, - id: Id, + id: DataColumnsByRootRequestId, item: RpcEvent>>, ) -> Option<( DataColumnsByRootRequester, @@ -799,6 +871,16 @@ impl SyncNetworkContext { } } } + + pub fn send_custody_columns_for_processing( + &self, + block_root: Hash256, + data_columns: Vec>>, + duration: Duration, + process_type: BlockProcessType, + ) -> Result<(), &'static str> { + todo!(); + } } fn to_fixed_blob_sidecar_list( diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index a59dbe53a24..d5a20a9a6be 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -1,28 +1,106 @@ -use crate::sync::SamplingId; +use crate::metrics; +use crate::sync::manager::{Id, SingleLookupReqId}; use self::request::ActiveColumnSampleRequest; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; use lighthouse_network::PeerId; -use slog::{debug, error, warn}; -use std::{sync::Arc, time::Duration}; -use types::{ - data_column_sidecar::{ColumnIndex, DataColumnSidecarList}, - DataColumnSidecar, EthSpec, Hash256, Slot, -}; +use slog::{debug, warn}; +use std::{collections::HashMap, marker::PhantomData, sync::Arc, time::Duration}; +use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Epoch, Hash256}; use super::{LookupFailure, SyncNetworkContext}; -pub struct CustodyId {} +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodyId { + pub id: CustodyRequester, + pub column_index: ColumnIndex, +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum CustodyRequester { + Lookup(SingleLookupReqId), + RangeSync(Id), +} + +type DataColumnSidecarList = Vec>>; + +pub struct Custody { + requests: HashMap>, + log: slog::Logger, +} + +impl Custody { + pub fn new(log: slog::Logger) -> Self { + Self { + requests: <_>::default(), + log, + } + } + + /// Insert a downloaded column into an active sampling request. Then make progress on the + /// entire request. + /// + /// ### Returns + /// + /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. + /// - `None`: Request still active, requester should do no action + pub fn on_data_column_downloaded( + &mut self, + id: CustodyId, + peer_id: PeerId, + resp: Result<(DataColumnSidecarList, Duration), LookupFailure>, + cx: &mut SyncNetworkContext, + ) -> Option<( + CustodyRequester, + Result, SamplingError>, + )> { + let Some(request) = self.requests.get_mut(&id.id) else { + // TOOD(das): This log can happen if the request is error'ed early and dropped + debug!(self.log, "Custody column downloaded event for unknown request"; "id" => ?id); + return None; + }; + + let result = request.on_data_column_downloaded(peer_id, id.column_index, resp, cx); + self.handle_sampling_result(result, &id.id) + } -struct ActiveCustodyRequest { + /// Converts a result from the internal format of `ActiveSamplingRequest` (error first to use ? + /// conveniently), to an Option first format to use an `if let Some() { act on result }` pattern + /// in the sync manager. + fn handle_sampling_result( + &mut self, + result: Result>, SamplingError>, + id: &CustodyRequester, + ) -> Option<( + CustodyRequester, + Result, SamplingError>, + )> { + let result = result.transpose(); + if let Some(result) = result { + debug!(self.log, "Sampling request completed, removing"; "id" => ?id, "result" => ?result); + metrics::inc_counter_vec( + &metrics::SAMPLING_REQUEST_RESULT, + &[metrics::from_result(&result)], + ); + self.requests.remove(id); + Some((*id, result)) + } else { + None + } + } +} + +pub struct ActiveCustodyRequest { block_root: Hash256, - block_slot: Slot, - requester_id: SamplingRequester, + block_epoch: Epoch, + column_indexes: Vec, + requester_id: CustodyRequester, column_requests: FnvHashMap, - columns: Vec>>, + columns: DataColumnSidecarList, /// Logger for the `SyncNetworkContext`. pub log: slog::Logger, + _phantom: PhantomData, } #[derive(Debug)] @@ -34,20 +112,26 @@ pub enum SamplingError { ColumnIndexOutOfBounds, } -impl ActiveCustodyRequest { - fn new( +impl ActiveCustodyRequest { + pub(crate) fn new( block_root: Hash256, - block_slot: Slot, - requester_id: SamplingRequester, + requester_id: CustodyRequester, + column_indexes: Vec, log: slog::Logger, ) -> Self { Self { block_root, - block_slot, + // TODO(das): use actual epoch if there's rotation + block_epoch: Epoch::new(0), + column_indexes, requester_id, - column_requests: (), + column_requests: column_indexes + .into_iter() + .map(|index| (index, ActiveColumnSampleRequest::new(index))) + .collect(), columns: vec![], log, + _phantom: <_>::default(), } } @@ -65,7 +149,7 @@ impl ActiveCustodyRequest { column_index: ColumnIndex, resp: Result<(DataColumnSidecarList, Duration), LookupFailure>, cx: &mut SyncNetworkContext, - ) -> Result>>>, SamplingError> { + ) -> Result>, SamplingError> { let Some(request) = self.column_requests.get_mut(&column_index) else { warn!( self.log, @@ -81,12 +165,14 @@ impl ActiveCustodyRequest { // No need to check data_columns has len > 1, as the SyncNetworkContext ensure that // only requested is returned (or none); if let Some(data_column) = data_columns.pop() { - request.on_sampling_success()?; + self.columns.push(data_column); + request.on_download_success()?; } else { // Peer does not have the requested data. // TODO(das) what to do? debug!(self.log, "Sampling peer claims to not have the data"; "block_root" => %self.block_root, "column_index" => column_index); - request.on_sampling_error()?; + // TODO(das) tolerate this failure if you are not sure the block has data + request.on_download_success()?; } } Err(err) => { @@ -94,7 +180,7 @@ impl ActiveCustodyRequest { // Error downloading, maybe penalize peer and retry again. // TODO(das) with different peer or different peer? - request.on_sampling_error()?; + request.on_download_error()?; } }; @@ -104,20 +190,52 @@ impl ActiveCustodyRequest { pub(crate) fn continue_requests( &mut self, cx: &mut SyncNetworkContext, - ) -> Result>>>, SamplingError> { - if too_much_failures { + ) -> Result>, SamplingError> { + // First check if sampling is completed, by computing `required_successes` + let mut successes = 0; + let mut failures = 0; + let mut ongoings = 0; + + for request in self.column_requests.values() { + if request.is_downloaded() { + successes += 1; + } + if request.is_failed() { + failures += 1; + } + if request.is_ongoing() { + ongoings += 1; + } + } + + if successes == self.column_requests.len() { + return Ok(Some(self.columns)); + } + if failures == self.column_requests.len() { return Err(SamplingError::TooManyFailures); } - if self.columns.len() > custody_requirements { - Ok(Some(self.columns)) - } else { - Ok(None) + + let mut sent_requests = 0; + for (_, request) in self.column_requests.iter_mut() { + if request.request(self.block_root, self.block_epoch, self.requester_id, cx)? { + sent_requests += 1; + } + } + + // Make sure that sampling doesn't stall, by ensuring that this sampling request will + // receive a new event of some type. If there are no ongoing requests, and no new + // request was sent, loop to increase the required_successes until the sampling fails if + // there are no peers. + if ongoings == 0 && sent_requests == 0 { + debug!(self.log, "Custody column request stalled"; "block_root" => %self.block_root); } + + Ok(None) } } mod request { - use super::{CustodyId, SamplingError}; + use super::{CustodyId, CustodyRequester, SamplingError}; use crate::sync::{ manager::DataColumnsByRootRequester, network_context::{DataColumnsByRootSingleBlockRequest, SyncNetworkContext}, @@ -125,7 +243,7 @@ mod request { use beacon_chain::BeaconChainTypes; use lighthouse_network::PeerId; use std::collections::HashSet; - use types::{data_column_sidecar::ColumnIndex, EthSpec, Hash256, Slot}; + use types::{data_column_sidecar::ColumnIndex, Epoch, Hash256}; pub(crate) struct ActiveColumnSampleRequest { column_index: ColumnIndex, @@ -152,46 +270,43 @@ mod request { } } - pub(crate) fn is_completed(&self) -> bool { + pub(crate) fn is_downloaded(&self) -> bool { match self.status { - Status::NoPeers | Status::NotStarted | Status::Sampling(_) => false, - Status::Verified => true, + Status::NoPeers | Status::NotStarted | Status::Downloading(_) => false, + Status::Downloaded => true, } } pub(crate) fn is_failed(&self) -> bool { match self.status { - Status::NotStarted | Status::Sampling(_) | Status::Verified => false, + Status::NotStarted | Status::Downloading(_) | Status::Downloaded => false, Status::NoPeers => true, } } pub(crate) fn is_ongoing(&self) -> bool { match self.status { - Status::NotStarted | Status::NoPeers | Status::Verified => false, - Status::Sampling(_) => true, + Status::NotStarted | Status::NoPeers | Status::Downloaded => false, + Status::Downloading(_) => true, } } pub(crate) fn request( &mut self, block_root: Hash256, - block_slot: Slot, - requester: SamplingRequester, + block_epoch: Epoch, + requester: CustodyRequester, cx: &mut SyncNetworkContext, ) -> Result { match &self.status { Status::NoPeers | Status::NotStarted => {} // Ok to continue - Status::Sampling(_) => return Ok(false), // Already downloading - Status::Verified => return Ok(false), // Already completed + Status::Downloading(_) => return Ok(false), // Already downloading + Status::Downloaded => return Ok(false), // Already completed } // TODO: When is a fork and only a subset of your peers know about a block, sampling should only // be queried on the peers on that fork. Should this case be handled? How to handle it? - let peer_ids = cx.get_custodial_peers( - block_slot.epoch(::slots_per_epoch()), - self.column_index, - ); + let peer_ids = cx.get_custodial_peers(block_epoch, self.column_index); // TODO(das) randomize custodial peer and avoid failing peers if let Some(peer_id) = peer_ids.first().cloned() { @@ -208,7 +323,7 @@ mod request { ) .map_err(SamplingError::SendFailed)?; - self.status = Status::Sampling(peer_id); + self.status = Status::Downloading(peer_id); Ok(true) } else { self.status = Status::NoPeers; @@ -216,9 +331,9 @@ mod request { } } - pub(crate) fn on_sampling_error(&mut self) -> Result { + pub(crate) fn on_download_error(&mut self) -> Result { match self.status.clone() { - Status::Sampling(peer_id) => { + Status::Downloading(peer_id) => { self.status = Status::NotStarted; Ok(peer_id) } @@ -228,10 +343,10 @@ mod request { } } - pub(crate) fn on_sampling_success(&mut self) -> Result<(), SamplingError> { + pub(crate) fn on_download_success(&mut self) -> Result<(), SamplingError> { match &self.status { - Status::Sampling(_) => { - self.status = Status::Verified; + Status::Downloading(_) => { + self.status = Status::Downloaded; Ok(()) } other => Err(SamplingError::BadState(format!( diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 75cb49d176d..273d3248e16 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -19,6 +19,7 @@ const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; #[derive(Debug, Copy, Clone, Display)] #[strum(serialize_all = "snake_case")] pub enum ByRangeRequestType { + BlocksAndColumns, BlocksAndBlobs, Blocks, }