diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6823a82f405..01374c11776 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2855,6 +2855,15 @@ impl BeaconChain { ChainSegmentResult::Successful { imported_blocks } } + /// Updates fork-choice node into a permanent `available` state so it can become a viable head. + /// Only completed sampling results are received. Blocks are unavailable by default and should + /// be pruned on finalization, on a timeout or by a max count. + pub async fn process_sampling_completed(self: &Arc, block_root: Hash256) { + // TODO(das): update fork-choice + // TODO(das): These log levels are too high, reduce once DAS matures + info!(self.log, "Sampling completed"; "block_root" => %block_root); + } + /// Returns `Ok(GossipVerifiedBlock)` if the supplied `block` should be forwarded onto the /// gossip network. The block is not imported into the chain, it is just partially verified. /// diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index debc4881a60..9372606ffed 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2584,3 +2584,33 @@ pub fn generate_rand_block_and_blobs( } (block, blob_sidecars) } + +pub fn generate_rand_block_and_data_columns( + fork_name: ForkName, + num_blobs: NumBlobs, + rng: &mut impl Rng, +) -> ( + SignedBeaconBlock>, + Vec>, +) { + let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng); + let blob = blobs.first().expect("should have at least 1 blob"); + + let data_columns = (0..E::number_of_columns()) + .map(|index| DataColumnSidecar { + index: index as u64, + column: <_>::default(), + kzg_commitments: block + .message() + .body() + .blob_kzg_commitments() + .unwrap() + .clone(), + kzg_proofs: (vec![]).into(), + signed_block_header: blob.signed_block_header.clone(), + kzg_commitments_inclusion_proof: <_>::default(), + }) + .collect::>(); + + (block, data_columns) +} diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 478b6d72cd0..cf2cdc86245 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -157,6 +157,10 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; /// will be stored before we start dropping them. const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024; +/// TODO(das): Placeholder number +const MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN: usize = 1000; +const MAX_SAMPLING_RESULT_QUEUE_LEN: usize = 1000; + /// The maximum number of queued `Vec` objects received during syncing that will /// be stored before we start dropping them. const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; @@ -252,6 +256,8 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic pub const RPC_BLOCK: &str = "rpc_block"; pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block"; pub const RPC_BLOBS: &str = "rpc_blob"; +pub const RPC_VERIFY_DATA_COLUMNS: &str = "rpc_verify_data_columns"; +pub const SAMPLING_RESULT: &str = "sampling_result"; pub const CHAIN_SEGMENT: &str = "chain_segment"; pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill"; pub const STATUS_PROCESSING: &str = "status_processing"; @@ -629,6 +635,8 @@ pub enum Work { RpcBlobs { process_fn: AsyncFn, }, + RpcVerifyDataColumn(AsyncFn), + SamplingResult(AsyncFn), IgnoredRpcBlock { process_fn: BlockingFn, }, @@ -675,6 +683,8 @@ impl Work { Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, Work::RpcBlobs { .. } => RPC_BLOBS, + Work::RpcVerifyDataColumn(_) => RPC_VERIFY_DATA_COLUMNS, + Work::SamplingResult(_) => SAMPLING_RESULT, Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK, Work::ChainSegment { .. } => CHAIN_SEGMENT, Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL, @@ -833,6 +843,8 @@ impl BeaconProcessor { // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN); + let mut rpc_verify_data_column_queue = FifoQueue::new(MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN); + let mut sampling_result_queue = FifoQueue::new(MAX_SAMPLING_RESULT_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); @@ -1278,6 +1290,12 @@ impl BeaconProcessor { rpc_block_queue.push(work, work_id, &self.log) } Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log), + Work::RpcVerifyDataColumn(_) => { + rpc_verify_data_column_queue.push(work, work_id, &self.log) + } + Work::SamplingResult(_) => { + sampling_result_queue.push(work, work_id, &self.log) + } Work::ChainSegment { .. } => { chain_segment_queue.push(work, work_id, &self.log) } @@ -1510,9 +1528,10 @@ impl BeaconProcessor { beacon_block_root: _, process_fn, } => task_spawner.spawn_async(process_fn), - Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => { - task_spawner.spawn_async(process_fn) - } + Work::RpcBlock { process_fn } + | Work::RpcBlobs { process_fn } + | Work::RpcVerifyDataColumn(process_fn) + | Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::GossipBlock(work) | Work::GossipBlobSidecar(work) diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 7df20eee280..85f9f50a048 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -378,6 +378,14 @@ pub struct DataColumnsByRootRequest { pub data_column_ids: RuntimeVariableList, } +impl DataColumnsByRootRequest { + pub fn new(blob_ids: Vec, spec: &ChainSpec) -> Self { + let data_column_ids = + RuntimeVariableList::from_vec(blob_ids, spec.max_request_data_column_sidecars as usize); + Self { data_column_ids } + } +} + /* RPC Handling and Grouping */ // Collection of enums and structs used by the Codecs to encode/decode RPC messages diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 8bec17f502c..e9ee06f045f 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,6 +1,6 @@ use crate::{ service::NetworkMessage, - sync::{manager::BlockProcessType, SyncMessage}, + sync::{manager::BlockProcessType, SamplingId, SyncMessage}, }; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain}; @@ -478,6 +478,42 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for some data_columns from ReqResp + pub fn send_rpc_data_columns( + self: &Arc, + block_root: Hash256, + data_columns: Vec>>, + seen_timestamp: Duration, + id: SamplingId, + ) -> Result<(), Error> { + let s = self.clone(); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcVerifyDataColumn(Box::pin(async move { + let result = s + .clone() + .validate_rpc_data_columns(block_root, data_columns, seen_timestamp) + .await; + // Sync handles these results + s.send_sync_message(SyncMessage::SampleVerified { id, result }); + })), + }) + } + + /// Create a new `Work` event with a block sampling completed result + pub fn send_sampling_completed( + self: &Arc, + block_root: Hash256, + ) -> Result<(), Error> { + let nbp = self.clone(); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::SamplingResult(Box::pin(async move { + nbp.process_sampling_completed(block_root).await; + })), + }) + } + /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 2e5f1216fd7..880eaa1ed0f 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -24,7 +24,7 @@ use store::KzgCommitment; use tokio::sync::mpsc; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; -use types::{Epoch, Hash256}; +use types::{DataColumnSidecar, Epoch, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -305,6 +305,25 @@ impl NetworkBeaconProcessor { }); } + /// Validate a list of data columns received from RPC requests + pub async fn validate_rpc_data_columns( + self: Arc>, + _block_root: Hash256, + _data_columns: Vec>>, + _seen_timestamp: Duration, + ) -> Result<(), String> { + // TODO(das): validate data column sidecar KZG commitment + Ok(()) + } + + /// Process a sampling completed event, inserting it into fork-choice + pub async fn process_sampling_completed( + self: Arc>, + block_root: Hash256, + ) { + self.chain.process_sampling_completed(block_root).await; + } + /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment( diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 57939f8dcf5..caa9c38af3a 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -514,11 +514,11 @@ impl Router { ) { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { - SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } => { - crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id); + id @ SyncId::RangeBlockAndBlobs { .. } => id, + other => { + crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other); return; } - id @ SyncId::RangeBlockAndBlobs { .. } => id, }, RequestId::Router => { crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id); @@ -577,12 +577,8 @@ impl Router { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { id @ SyncId::SingleBlock { .. } => id, - SyncId::RangeBlockAndBlobs { .. } => { - crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id); - return; - } - SyncId::SingleBlob { .. } => { - crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id); + other => { + crit!(self.log, "BlocksByRoot response on incorrect request"; "request" => ?other); return; } }, @@ -615,12 +611,8 @@ impl Router { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { id @ SyncId::SingleBlob { .. } => id, - SyncId::SingleBlock { .. } => { - crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id); - return; - } - SyncId::RangeBlockAndBlobs { .. } => { - crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id); + other => { + crit!(self.log, "BlobsByRoot response on incorrect request"; "request" => ?other); return; } }, diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 8e3b35ee5d3..eeb6b2e719c 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -2,7 +2,8 @@ use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::RequestId; use crate::sync::manager::{RequestId as SyncRequestId, SingleLookupReqId, SyncManager}; -use crate::sync::SyncMessage; +use crate::sync::sampling::{SamplingConfig, SamplingRequester}; +use crate::sync::{SamplingId, SyncMessage}; use crate::NetworkMessage; use std::sync::Arc; @@ -12,7 +13,8 @@ use crate::sync::block_lookups::common::ResponseType; use beacon_chain::builder::Witness; use beacon_chain::eth1_chain::CachingEth1Backend; use beacon_chain::test_utils::{ - build_log, generate_rand_block_and_blobs, BeaconChainHarness, EphemeralHarnessType, NumBlobs, + build_log, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, + BeaconChainHarness, EphemeralHarnessType, NumBlobs, }; use beacon_processor::WorkEvent; use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; @@ -22,6 +24,8 @@ use slog::info; use slot_clock::{ManualSlotClock, SlotClock, TestingSlotClock}; use store::MemoryStore; use tokio::sync::mpsc; +use types::data_column_sidecar::ColumnIndex; +use types::DataColumnSidecar; use types::{ test_utils::{SeedableRng, XorShiftRng}, BlobSidecar, ForkName, MinimalEthSpec as E, SignedBeaconBlock, @@ -57,6 +61,7 @@ type T = Witness, E, MemoryStore, Memo struct TestRig { /// Receiver for `BeaconProcessor` events (e.g. block processing results). beacon_processor_rx: mpsc::Receiver>, + beacon_processor_rx_queue: Vec>, /// Receiver for `NetworkMessage` (e.g. outgoing RPC requests from sync) network_rx: mpsc::UnboundedReceiver>, /// Stores all `NetworkMessage`s received from `network_recv`. (e.g. outgoing RPC requests) @@ -72,6 +77,9 @@ struct TestRig { } const D: Duration = Duration::new(0, 0); +const SAMPLING_REQUIRED_SUCCESSES: usize = 2; + +type SamplingIds = Vec<(Id, ColumnIndex)>; impl TestRig { fn test_setup() -> Self { @@ -114,6 +122,7 @@ impl TestRig { let rng = XorShiftRng::from_seed([42; 16]); TestRig { beacon_processor_rx, + beacon_processor_rx_queue: vec![], network_rx, network_rx_queue: vec![], rng, @@ -123,6 +132,9 @@ impl TestRig { network_tx, beacon_processor.into(), sync_recv, + SamplingConfig::Custom { + required_successes: vec![SAMPLING_REQUIRED_SUCCESSES], + }, log.clone(), ), fork_name, @@ -166,6 +178,10 @@ impl TestRig { )); } + fn trigger_sample_block(&mut self, block_root: Hash256, block_slot: Slot) { + self.send_sync_message(SyncMessage::SampleBlock(block_root, block_slot)) + } + fn rand_block(&mut self) -> SignedBeaconBlock { self.rand_block_and_blobs(NumBlobs::None).0 } @@ -179,6 +195,11 @@ impl TestRig { generate_rand_block_and_blobs::(fork_name, num_blobs, rng) } + fn rand_block_and_data_columns(&mut self) -> (SignedBeaconBlock, Vec>) { + let num_blobs = NumBlobs::Number(1); + generate_rand_block_and_data_columns::(self.fork_name, num_blobs, &mut self.rng) + } + pub fn rand_block_and_parent( &mut self, ) -> (SignedBeaconBlock, SignedBeaconBlock, Hash256, Hash256) { @@ -210,6 +231,20 @@ impl TestRig { self.sync_manager.failed_chains_contains(chain_hash) } + fn expect_no_active_sampling(&mut self) { + assert_eq!( + self.sync_manager.active_sampling_requests(), + vec![], + "expected no active sampling" + ); + } + + fn expect_clean_finished_sampling(&mut self) { + self.expect_empty_network(); + self.expect_sampling_result_work(); + self.expect_no_active_sampling(); + } + #[track_caller] fn assert_parent_lookups_consistency(&self) { let hashes = self.active_parent_lookups(); @@ -233,6 +268,10 @@ impl TestRig { peer_id } + fn new_connected_peers(&mut self, count: usize) -> Vec { + (0..count).map(|_| self.new_connected_peer()).collect() + } + fn parent_chain_processed(&mut self, chain_hash: Hash256, result: BatchProcessResult) { self.send_sync_message(SyncMessage::BatchProcessed { sync_type: ChainSegmentProcessId::ParentLookup(chain_hash), @@ -379,6 +418,77 @@ impl TestRig { }) } + fn return_empty_sampling_requests(&mut self, sampling_ids: SamplingIds) { + for (id, column_index) in sampling_ids { + self.log(&format!("return empty data column for {column_index}")); + self.return_empty_sampling_request(id) + } + } + + fn return_empty_sampling_request(&mut self, id: Id) { + let peer_id = PeerId::random(); + // Send stream termination + self.send_sync_message(SyncMessage::RpcDataColumn { + request_id: SyncRequestId::DataColumnsByRoot(id), + peer_id, + data_column: None, + seen_timestamp: timestamp_now(), + }); + } + + fn complete_valid_sampling_column_requests( + &mut self, + sampling_ids: SamplingIds, + data_columns: Vec>, + ) { + for (id, column_index) in sampling_ids { + self.log(&format!("return valid data column for {column_index}")); + self.complete_valid_sampling_column_request( + id, + data_columns[column_index as usize].clone(), + ); + } + } + + fn complete_valid_sampling_column_request( + &mut self, + id: Id, + data_column: DataColumnSidecar, + ) { + let peer_id = PeerId::random(); + let block_root = data_column.block_root(); + let column_index = data_column.index; + + // Send chunk + self.send_sync_message(SyncMessage::RpcDataColumn { + request_id: SyncRequestId::DataColumnsByRoot(id), + peer_id, + data_column: Some(Arc::new(data_column)), + seen_timestamp: timestamp_now(), + }); + + // Send stream termination + self.send_sync_message(SyncMessage::RpcDataColumn { + request_id: SyncRequestId::DataColumnsByRoot(id), + peer_id, + data_column: None, + seen_timestamp: timestamp_now(), + }); + + // Expect work event + // TODO(das): worth it to append sender id to the work event for stricter assertion? + self.expect_sample_verify_request(); + + // Respond with valid result + self.send_sync_message(SyncMessage::SampleVerified { + id: SamplingId { + id: SamplingRequester::ImportedBlock(block_root), + column_index, + }, + result: Ok(()), + }) + } + fn peer_disconnected(&mut self, peer_id: PeerId) { self.send_sync_message(SyncMessage::Disconnect(peer_id)); } @@ -409,6 +519,36 @@ impl TestRig { } } + fn drain_beacon_processor_rx(&mut self) { + while let Ok(event) = self.beacon_processor_rx.try_recv() { + self.beacon_processor_rx_queue.push(event); + } + } + + fn pop_received_beacon_processor_event) -> Option>( + &mut self, + predicate_transform: F, + ) -> Result { + self.drain_beacon_processor_rx(); + + if let Some(index) = self + .beacon_processor_rx_queue + .iter() + .position(|x| predicate_transform(x).is_some()) + { + // Transform the item, knowing that it won't be None because we checked it in the position predicate. + let transformed = predicate_transform(&self.beacon_processor_rx_queue[index]).unwrap(); + self.beacon_processor_rx_queue.remove(index); + Ok(transformed) + } else { + Err(format!( + "current beacon processor messages {:?}", + self.beacon_processor_rx_queue + ) + .to_string()) + } + } + #[track_caller] fn expect_block_lookup_request(&mut self, for_block: Hash256) -> SingleLookupReqId { self.pop_received_network_event(|ev| match ev { @@ -485,6 +625,38 @@ impl TestRig { .unwrap_or_else(|e| panic!("Expected blob parent request for {for_block:?}: {e}")) } + fn expect_sampling_requests(&mut self, for_block: Hash256, count: usize) -> SamplingIds { + (0..count) + .map(|i| { + self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id: _, + request: Request::DataColumnsByRoot(request), + request_id: RequestId::Sync(SyncRequestId::DataColumnsByRoot(id)), + } if request + .data_column_ids + .to_vec() + .iter() + .any(|r| r.block_root == for_block) => + { + let index = request.data_column_ids.to_vec().first().unwrap().index; + Some((*id, index)) + } + _ => None, + }) + .unwrap_or_else(|e| { + panic!("Expected sampling request {i}/{count} for {for_block:?}: {e}") + }) + }) + .collect() + } + + fn expect_only_sampling_requests(&mut self, for_block: Hash256, count: usize) -> SamplingIds { + let ids = self.expect_sampling_requests(for_block, count); + self.expect_empty_network(); + ids + } + fn expect_lookup_request_block_and_blobs(&mut self, block_root: Hash256) -> SingleLookupReqId { let id = self.expect_block_lookup_request(block_root); // If we're in deneb, a blob request should have been triggered as well, @@ -523,6 +695,28 @@ impl TestRig { } } + fn expect_sample_verify_request(&mut self) { + self.pop_received_beacon_processor_event(|ev| { + if ev.work_type() == beacon_processor::RPC_VERIFY_DATA_COLUMNS { + Some(()) + } else { + None + } + }) + .unwrap_or_else(|e| panic!("Expected sample verify work: {e}")) + } + + fn expect_sampling_result_work(&mut self) { + self.pop_received_beacon_processor_event(|ev| { + if ev.work_type() == beacon_processor::SAMPLING_RESULT { + Some(()) + } else { + None + } + }) + .unwrap_or_else(|e| panic!("Expected sampling result work: {e}")) + } + fn expect_no_penalty_for(&mut self, peer_id: PeerId) { self.drain_network_rx(); let downscore_events = self @@ -554,7 +748,11 @@ impl TestRig { fn expect_empty_network(&mut self) { self.drain_network_rx(); if !self.network_rx_queue.is_empty() { - panic!("expected no network events: {:#?}", self.network_rx_queue); + let n = self.network_rx_queue.len(); + panic!( + "expected no network events but got {n} events, displaying first 2: {:#?}", + self.network_rx_queue[..n.min(2)].iter().collect::>() + ); } } @@ -1092,6 +1290,49 @@ fn test_same_chain_race_condition() { assert_eq!(rig.active_parent_lookups_count(), 0); } +#[test] +fn sampling_happy_path() { + let Some(mut r) = TestRig::test_setup_after_deneb() else { + return; + }; + r.new_connected_peers(100); // Add enough sampling peers + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + r.trigger_sample_block(block_root, block.slot()); + // Retrieve all outgoing sample requests for random column indexes + let sampling_ids = r.expect_only_sampling_requests(block_root, SAMPLING_REQUIRED_SUCCESSES); + // Resolve all of them one by one + r.complete_valid_sampling_column_requests(sampling_ids, data_columns); + r.expect_clean_finished_sampling(); +} + +#[test] +fn sampling_with_retries() { + let Some(mut r) = TestRig::test_setup_after_deneb() else { + return; + }; + r.new_connected_peers(100); // Add enough sampling peers + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + r.trigger_sample_block(block_root, block.slot()); + // Retrieve all outgoing sample requests for random column indexes, and return empty responses + let sampling_ids = r.expect_only_sampling_requests(block_root, SAMPLING_REQUIRED_SUCCESSES); + r.return_empty_sampling_requests(sampling_ids); + // Expect retries for all of them, and resolve them + let sampling_ids = r.expect_only_sampling_requests(block_root, SAMPLING_REQUIRED_SUCCESSES); + r.complete_valid_sampling_column_requests(sampling_ids, data_columns); + r.expect_clean_finished_sampling(); +} + +// TODO(das): Test retries of DataColumnByRoot: +// - Expect request for column_index +// - Respond with bad data +// - Respond with stream terminator +// ^ The stream terminator should be ignored and not close the next retry + +// TODO(das): Test error early a sampling request and it getting drop + then receiving responses +// from pending requests. + mod deneb_only { use super::*; use beacon_chain::data_availability_checker::AvailabilityCheckError; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 112ee705da6..1b41e5e0b7e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -39,6 +39,7 @@ use super::block_lookups::BlockLookups; use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; +use super::sampling::{Sampling, SamplingConfig, SamplingId, SamplingRequester, SamplingResult}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; @@ -63,7 +64,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a @@ -90,10 +91,18 @@ pub enum RequestId { SingleBlock { id: SingleLookupReqId }, /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, + /// TODO + DataColumnsByRoot(Id), /// Range request that is composed by both a block range request and a blob range request. RangeBlockAndBlobs { id: Id }, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum DataColumnsByRootRequester { + Sampling(SamplingId), + Custody, +} + #[derive(Debug)] /// A message that can be sent to the sync manager thread. pub enum SyncMessage { @@ -116,6 +125,13 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + RpcDataColumn { + request_id: RequestId, + peer_id: PeerId, + data_column: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, RpcBlock, Hash256), @@ -126,6 +142,10 @@ pub enum SyncMessage { /// manager to attempt to find the block matching the unknown hash. UnknownBlockHashFromAttestation(PeerId, Hash256), + /// Request to start sampling a block. Caller should ensure that block has data before sending + /// the request. + SampleBlock(Hash256, Slot), + /// A peer has disconnected. Disconnect(PeerId), @@ -147,6 +167,12 @@ pub enum SyncMessage { process_type: BlockProcessType, result: BlockProcessingResult, }, + + /// Sample data column verified + SampleVerified { + id: SamplingId, + result: Result<(), String>, + }, } /// The type of processing specified for a received block. @@ -201,6 +227,8 @@ pub struct SyncManager { block_lookups: BlockLookups, + sampling: Sampling, + /// The logger for the import manager. log: Logger, } @@ -227,6 +255,7 @@ pub fn spawn( network_send, beacon_processor, sync_recv, + SamplingConfig::Default, log.clone(), ); @@ -241,6 +270,7 @@ impl SyncManager { network_send: mpsc::UnboundedSender>, beacon_processor: Arc>, sync_recv: mpsc::UnboundedReceiver>, + sampling_config: SamplingConfig, log: slog::Logger, ) -> Self { let network_globals = beacon_processor.network_globals.clone(); @@ -259,6 +289,7 @@ impl SyncManager { beacon_chain.data_availability_checker.clone(), log.clone(), ), + sampling: Sampling::new(sampling_config, log.clone()), log: log.clone(), } } @@ -278,6 +309,11 @@ impl SyncManager { self.block_lookups.failed_chains_contains(chain_hash) } + #[cfg(test)] + pub(crate) fn active_sampling_requests(&self) -> Vec { + self.sampling.active_sampling_requests() + } + fn network_globals(&self) -> &NetworkGlobals { self.network.network_globals() } @@ -326,6 +362,9 @@ impl SyncManager { RequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } + RequestId::DataColumnsByRoot(id) => { + self.on_single_data_column_response(id, peer_id, RpcEvent::RPCError(error)) + } RequestId::RangeBlockAndBlobs { id } => { if let Some(sender_id) = self.network.range_request_failed(id) { match sender_id { @@ -570,6 +609,12 @@ impl SyncManager { blob_sidecar, seen_timestamp, } => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp), + SyncMessage::RpcDataColumn { + request_id, + peer_id, + data_column, + seen_timestamp, + } => self.rpc_data_column_received(request_id, peer_id, data_column, seen_timestamp), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -606,6 +651,15 @@ impl SyncManager { debug!(self.log, "Received unknown block hash message"; "block_root" => %block_root); self.handle_unknown_block_root(peer_id, block_root); } + SyncMessage::SampleBlock(block_root, block_slot) => { + debug!(self.log, "Received SampleBlock message"; "block_root" => %block_root); + if let Some((requester, result)) = + self.sampling + .on_new_sample_request(block_root, block_slot, &mut self.network) + { + self.on_sampling_result(requester, result) + } + } SyncMessage::Disconnect(peer_id) => { debug!(self.log, "Received disconnected message"; "peer_id" => %peer_id); self.peer_disconnect(&peer_id); @@ -666,6 +720,14 @@ impl SyncManager { .block_lookups .parent_chain_processed(chain_hash, result, &mut self.network), }, + SyncMessage::SampleVerified { id, result } => { + if let Some(result) = + self.sampling + .on_sample_verified(id, result, &mut self.network) + { + self.on_sampling_result(id.id, result) + } + } } } @@ -814,12 +876,12 @@ impl SyncManager { None => RpcEvent::StreamTermination, }, ), - RequestId::SingleBlob { .. } => { - crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id ); - } RequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, block.into()) } + other => { + crit!(self.log, "Single block received on incorrect request"; "request_id" => ?other); + } } } @@ -829,7 +891,7 @@ impl SyncManager { peer_id: PeerId, block: RpcEvent>>, ) { - if let Some(resp) = self.network.on_single_block_response(id, block) { + if let Some((_, resp)) = self.network.on_single_block_response(id, block) { match resp { Ok((block, seen_timestamp)) => match id.lookup_type { LookupType::Current => self @@ -881,9 +943,6 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - RequestId::SingleBlock { .. } => { - crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id ); - } RequestId::SingleBlob { id } => self.on_single_blob_response( id, peer_id, @@ -895,6 +954,36 @@ impl SyncManager { RequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, blob.into()) } + other => { + crit!(self.log, "Single blob received on incorrect request"; "request_id" => ?other); + } + } + } + + fn rpc_data_column_received( + &mut self, + request_id: RequestId, + peer_id: PeerId, + data_column: Option>>, + seen_timestamp: Duration, + ) { + match request_id { + RequestId::SingleBlock { .. } | RequestId::SingleBlob { .. } => { + crit!(self.log, "bad request id for data_column"; "peer_id" => %peer_id ); + } + RequestId::DataColumnsByRoot(id) => { + self.on_single_data_column_response( + id, + peer_id, + match data_column { + Some(data_column) => RpcEvent::Response(data_column, seen_timestamp), + None => RpcEvent::StreamTermination, + }, + ); + } + RequestId::RangeBlockAndBlobs { id } => { + todo!("TODO(das): handle sampling for range sync based on {id}"); + } } } @@ -904,7 +993,7 @@ impl SyncManager { peer_id: PeerId, blob: RpcEvent>>, ) { - if let Some(resp) = self.network.on_single_blob_response(id, blob) { + if let Some((_, resp)) = self.network.on_single_blob_response(id, blob) { match resp { Ok((blobs, seen_timestamp)) => match id.lookup_type { LookupType::Current => self @@ -949,6 +1038,68 @@ impl SyncManager { } } + fn on_single_data_column_response( + &mut self, + id: Id, + peer_id: PeerId, + data_column: RpcEvent>>, + ) { + let Some((requester, resp)) = self + .network + .on_data_columns_by_root_response(id, data_column) + else { + // TOOD(das): error o log + return; + }; + + match requester { + DataColumnsByRootRequester::Sampling(id) => { + if let Some(result) = + self.sampling + .on_sample_downloaded(id, peer_id, resp, &mut self.network) + { + self.on_sampling_result(id.id, result) + } + } + DataColumnsByRootRequester::Custody => { + todo!("TODO(das): handle custody requests"); + } + } + } + + fn on_sampling_result(&mut self, requester: SamplingRequester, result: SamplingResult) { + // TODO(das): How is a consumer of sampling results? + // - Fork-choice for trailing DA + // - Single lookups to complete import requirements + // - Range sync to complete import requirements? Can sampling for syncing lag behind and + // accumulate in fork-choice? + + match requester { + SamplingRequester::ImportedBlock(block_root) => { + debug!(self.log, "Sampling result"; "block_root" => %block_root, "result" => ?result); + + // TODO(das): Consider moving SamplingResult to the beacon_chain crate and import + // here. No need to add too much enum variants, just whatever the beacon_chain or + // fork-choice needs to make a decision. Currently the fork-choice only needs to + // be notified of successful samplings, i.e. sampling failures don't trigger pruning + match result { + Ok(_) => { + if let Err(e) = self + .network + .beacon_processor() + .send_sampling_completed(block_root) + { + warn!(self.log, "Error sending sampling result"; "block_root" => ?block_root, "reason" => ?e); + } + } + Err(e) => { + warn!(self.log, "Sampling failed"; "block_root" => %block_root, "reason" => ?e); + } + } + } + } + } + /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. fn range_block_and_blobs_response( diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 7b244bceceb..0fb01a73e07 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -8,6 +8,8 @@ pub mod manager; mod network_context; mod peer_sync_info; mod range_sync; +mod sampling; pub use manager::{BatchProcessResult, SyncMessage}; pub use range_sync::{BatchOperationOutcome, ChainId}; +pub use sampling::SamplingId; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index fc91270c1dc..6f29ed41b32 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,10 +1,14 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. -use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; -pub use self::requests::{BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest}; +use self::requests::{ + ActiveBlobsByRootRequest, ActiveBlocksByRootRequest, ActiveDataColumnsByRootRequest, +}; +pub use self::requests::{ + BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest, +}; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; -use super::manager::{Id, RequestId as SyncRequestId}; +use super::manager::{DataColumnsByRootRequester, Id, RequestId as SyncRequestId}; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::{NetworkMessage, RequestId}; @@ -24,7 +28,10 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::data_column_sidecar::ColumnIndex; +use types::{ + BlobSidecar, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, SignedBeaconBlock, +}; mod requests; @@ -52,7 +59,7 @@ pub enum RpcEvent { RPCError(RPCError), } -pub type RpcProcessingResult = Option>; +pub type RpcProcessingResult = Option<(ID, Result<(T, Duration), LookupFailure>)>; pub enum LookupFailure { RpcError(RPCError), @@ -93,6 +100,8 @@ pub struct SyncNetworkContext { /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. blobs_by_root_requests: FnvHashMap>, + data_columns_by_root_requests: + FnvHashMap>, /// BlocksByRange requests paired with BlobsByRange range_blocks_and_blobs_requests: @@ -142,6 +151,7 @@ impl SyncNetworkContext { request_id: 1, blocks_by_root_requests: <_>::default(), blobs_by_root_requests: <_>::default(), + data_columns_by_root_requests: <_>::default(), range_blocks_and_blobs_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -149,6 +159,33 @@ impl SyncNetworkContext { } } + // TODO(das): epoch argument left here in case custody rotation is implemented + pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec { + let mut peer_ids = vec![]; + + for (peer_id, peer_info) in self.network_globals().peers.read().peers() { + if let Some(enr) = peer_info.enr() { + // TODO(das): do not hardcode `custody_subnet_count` + let custody_subnet_count = 2; + // TODO(das): consider caching a map of subnet -> Vec and invalidating + // whenever a peer connected or disconnect event in received + let mut subnets = DataColumnSubnetId::compute_custody_subnets::( + enr.node_id().raw().into(), + custody_subnet_count, + ); + if subnets.any(|subnet| { + subnet + .columns::() + .any(|index| index == column_index) + }) { + peer_ids.push(*peer_id) + } + } + } + + peer_ids + } + pub fn network_globals(&self) -> &NetworkGlobals { &self.network_beacon_processor.network_globals } @@ -350,6 +387,37 @@ impl SyncNetworkContext { Ok(()) } + pub fn data_column_lookup_request( + &mut self, + requester: DataColumnsByRootRequester, + peer_id: PeerId, + request: DataColumnsByRootSingleBlockRequest, + ) -> Result<(), &'static str> { + let id = self.next_id(); + + debug!( + self.log, + "Sending DataColumnsByRoot Request"; + "method" => "DataColumnsByRoot", + "block_root" => ?request.block_root, + "indices" => ?request.indices, + "peer" => %peer_id, + "requester" => ?requester, + "id" => id, + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: Request::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)), + request_id: RequestId::Sync(SyncRequestId::DataColumnsByRoot(id)), + })?; + + self.data_columns_by_root_requests + .insert(id, ActiveDataColumnsByRootRequest::new(request, requester)); + + Ok(()) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -388,6 +456,31 @@ impl SyncNetworkContext { }); } + pub fn report_peer_on_rpc_error(&self, peer_id: &PeerId, error: &RPCError) { + // Note: logging the report event here with the full error display. The log inside + // `report_peer` only includes a smaller string, like "invalid_data" + debug!(self.log, "reporting peer for sync lookup error"; "error" => %error); + if let Some(action) = match error { + // Protocol errors are heavily penalized + RPCError::SSZDecodeError(..) + | RPCError::IoError(..) + | RPCError::ErrorResponse(..) + | RPCError::InvalidData(..) + | RPCError::HandlerRejected => Some(PeerAction::LowToleranceError), + // Timing / network errors are less penalized + // TODO: Is IoError a protocol error or network error? + RPCError::StreamTimeout | RPCError::IncompleteStream | RPCError::NegotiationTimeout => { + Some(PeerAction::MidToleranceError) + } + // Not supporting a specific protocol is tolerated. TODO: Are you sure? + RPCError::UnsupportedProtocol => None, + // Our fault, don't penalize peer + RPCError::InternalError(..) | RPCError::Disconnected => None, + } { + self.report_peer(*peer_id, action, error.into()); + } + } + /// Subscribes to core topics. pub fn subscribe_core_topics(&self) { self.network_send @@ -458,12 +551,12 @@ impl SyncNetworkContext { &mut self, request_id: SingleLookupReqId, block: RpcEvent>>, - ) -> RpcProcessingResult>> { + ) -> RpcProcessingResult>, ()> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { return None; }; - Some(match block { + let resp = match block { RpcEvent::Response(block, seen_timestamp) => { match request.get_mut().add_response(block) { Ok(block) => Ok((block, seen_timestamp)), @@ -482,19 +575,20 @@ impl SyncNetworkContext { request.remove(); Err(e.into()) } - }) + }; + Some(((), resp)) } pub fn on_single_blob_response( &mut self, request_id: SingleLookupReqId, blob: RpcEvent>>, - ) -> RpcProcessingResult> { + ) -> RpcProcessingResult, ()> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { return None; }; - Some(match blob { + let resp = match blob { RpcEvent::Response(blob, _) => match request.get_mut().add_response(blob) { Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) .map(|blobs| (blobs, timestamp_now())) @@ -518,7 +612,45 @@ impl SyncNetworkContext { request.remove(); Err(e.into()) } - }) + }; + Some(((), resp)) + } + + pub fn on_data_columns_by_root_response( + &mut self, + id: Id, + item: RpcEvent>>, + ) -> RpcProcessingResult>>, DataColumnsByRootRequester> + { + let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else { + return None; + }; + + let requester = request.get().requester; + + let resp = match item { + RpcEvent::Response(item, _) => match request.get_mut().add_response(item) { + // TODO: Track last chunk timestamp + Ok(Some(items)) => Ok((items, timestamp_now())), + Ok(None) => return None, + Err(e) => { + request.remove(); + Err(e.into()) + } + }, + RpcEvent::StreamTermination => { + // Stream terminator + match request.remove().terminate() { + Some(items) => Ok((items, timestamp_now())), + None => return None, + } + } + RpcEvent::RPCError(e) => { + request.remove(); + Err(e.into()) + } + }; + Some((requester, resp)) } } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 0522b7fa384..d9894d6f579 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,9 +1,13 @@ use beacon_chain::get_block_root; -use lighthouse_network::rpc::{methods::BlobsByRootRequest, BlocksByRootRequest}; +use lighthouse_network::rpc::{ + methods::{BlobsByRootRequest, DataColumnsByRootRequest}, + BlocksByRootRequest, RPCError, +}; use std::sync::Arc; use strum::IntoStaticStr; use types::{ - blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, + blob_sidecar::BlobIdentifier, data_column_sidecar::DataColumnIdentifier, BlobSidecar, + ChainSpec, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlock, }; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -147,3 +151,90 @@ impl ActiveBlobsByRootRequest { } } } + +#[derive(Debug, Clone)] +pub struct DataColumnsByRootSingleBlockRequest { + pub block_root: Hash256, + pub indices: Vec, +} + +impl DataColumnsByRootSingleBlockRequest { + pub fn into_request(self, spec: &ChainSpec) -> DataColumnsByRootRequest { + DataColumnsByRootRequest::new( + self.indices + .into_iter() + .map(|index| DataColumnIdentifier { + block_root: self.block_root, + index, + }) + .collect(), + spec, + ) + } +} + +pub struct ActiveDataColumnsByRootRequest { + pub requester: T, + request: DataColumnsByRootSingleBlockRequest, + items: Vec>>, + resolved: bool, +} + +impl ActiveDataColumnsByRootRequest { + pub fn new(request: DataColumnsByRootSingleBlockRequest, requester: T) -> Self { + Self { + requester, + request, + items: vec![], + resolved: false, + } + } + + /// Appends a chunk to this multi-item request. If all expected chunks are received, this + /// method returns `Some`, resolving the request before the stream terminator. + /// The active request SHOULD be dropped after `add_response` returns an error + pub fn add_response( + &mut self, + blob: Arc>, + ) -> Result>>>, RPCError> { + if self.resolved { + return Err(RPCError::InvalidData("too many responses".to_string())); + } + + let block_root = blob.block_root(); + if self.request.block_root != block_root { + return Err(RPCError::InvalidData(format!( + "un-requested block root {block_root:?}" + ))); + } + if !blob.verify_inclusion_proof().unwrap_or(false) { + return Err(RPCError::InvalidData("invalid inclusion proof".to_string())); + } + if !self.request.indices.contains(&blob.index) { + return Err(RPCError::InvalidData(format!( + "un-requested index {}", + blob.index + ))); + } + if self.items.iter().any(|b| b.index == blob.index) { + return Err(RPCError::InvalidData("duplicated data".to_string())); + } + + self.items.push(blob); + if self.items.len() >= self.request.indices.len() { + // All expected chunks received, return result early + self.resolved = true; + Ok(Some(std::mem::take(&mut self.items))) + } else { + Ok(None) + } + } + + pub fn terminate(self) -> Option>>> { + if self.resolved { + None + } else { + Some(self.items) + } + } +} diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs new file mode 100644 index 00000000000..1fa12f45171 --- /dev/null +++ b/beacon_node/network/src/sync/sampling.rs @@ -0,0 +1,486 @@ +use self::request::ActiveColumnSampleRequest; + +use super::network_context::{LookupFailure, SyncNetworkContext}; +use beacon_chain::BeaconChainTypes; +use fnv::FnvHashMap; +use lighthouse_network::{PeerAction, PeerId}; +use rand::{seq::SliceRandom, thread_rng}; +use slog::{debug, error, warn}; +use std::{ + collections::hash_map::Entry, collections::HashMap, marker::PhantomData, sync::Arc, + time::Duration, +}; +use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256, Slot}; + +pub type SamplingResult = Result<(), SamplingError>; + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct SamplingId { + pub id: SamplingRequester, + pub column_index: ColumnIndex, +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum SamplingRequester { + ImportedBlock(Hash256), +} + +type DataColumnSidecarList = Vec>>; + +pub struct Sampling { + // TODO(das): stalled sampling request are never cleaned up + requests: HashMap>, + sampling_config: SamplingConfig, + log: slog::Logger, +} + +impl Sampling { + pub fn new(sampling_config: SamplingConfig, log: slog::Logger) -> Self { + Self { + requests: <_>::default(), + sampling_config, + log, + } + } + + #[cfg(test)] + pub fn active_sampling_requests(&self) -> Vec { + self.requests.values().map(|r| r.block_root).collect() + } + + pub fn on_new_sample_request( + &mut self, + block_root: Hash256, + block_slot: Slot, + cx: &mut SyncNetworkContext, + ) -> Option<(SamplingRequester, SamplingResult)> { + let requester = SamplingRequester::ImportedBlock(block_root); + + let request = match self.requests.entry(requester) { + Entry::Vacant(e) => e.insert(ActiveSamplingRequest::new( + block_root, + block_slot, + requester, + &self.sampling_config, + self.log.clone(), + )), + Entry::Occupied(_) => { + warn!(self.log, "Ignoring duplicate sampling request"; "id" => ?requester); + return None; + } + }; + + debug!(self.log, "Created new sample request"; "id" => ?requester); + + // TOOD(das): If a node has very little peers, continue_sampling() will attempt to find enough + // to sample here, immediately failing the sampling request. There should be some grace + // period to allow the peer manager to find custody peers. + request + .continue_sampling(cx) + .transpose() + .map(|result| (requester, result)) + } + + pub fn on_sample_downloaded( + &mut self, + id: SamplingId, + peer_id: PeerId, + resp: Result<(DataColumnSidecarList, Duration), LookupFailure>, + cx: &mut SyncNetworkContext, + ) -> Option { + let Some(request) = self.requests.get_mut(&id.id) else { + // TOOD(das): This log can happen if the request is error'ed early and dropped + debug!(self.log, "Sample downloaded event for unknown request"; "id" => ?id); + return None; + }; + + let result = request.on_sample_downloaded(peer_id, id.column_index, resp, cx); + self.handle_sampling_result(result, &id.id) + } + + pub fn on_sample_verified( + &mut self, + id: SamplingId, + result: Result<(), String>, + cx: &mut SyncNetworkContext, + ) -> Option { + let Some(request) = self.requests.get_mut(&id.id) else { + // TOOD(das): This log can happen if the request is error'ed early and dropped + debug!(self.log, "Sample verified event for unknown request"; "id" => ?id); + return None; + }; + + let result = request.on_sample_verified(id.column_index, result, cx); + self.handle_sampling_result(result, &id.id) + } + + fn handle_sampling_result( + &mut self, + result: Result, SamplingError>, + id: &SamplingRequester, + ) -> Option { + let result = result.transpose(); + if result.is_some() { + debug!(self.log, "Removed sampling request"; "id" => ?id); + self.requests.remove(id); + } + result + } +} + +pub struct ActiveSamplingRequest { + block_root: Hash256, + block_slot: Slot, + requester_id: SamplingRequester, + column_requests: FnvHashMap, + column_shuffle: Vec, + required_successes: Vec, + /// Logger for the `SyncNetworkContext`. + pub log: slog::Logger, + _phantom: PhantomData, +} + +#[derive(Debug)] +pub enum SamplingError { + SendFailed(&'static str), + ProcessorUnavailable, + TooManyFailures, + BadState(String), +} + +/// Required success index by current failures, with p_target=5.00E-06 +/// Ref: https://colab.research.google.com/drive/18uUgT2i-m3CbzQ5TyP9XFKqTn1DImUJD#scrollTo=E82ITcgB5ATh +const REQUIRED_SUCCESSES: [usize; 11] = [16, 20, 23, 26, 29, 32, 34, 37, 39, 42, 44]; + +#[derive(Debug, Clone)] +pub enum SamplingConfig { + Default, + #[allow(dead_code)] + Custom { + required_successes: Vec, + }, +} + +impl ActiveSamplingRequest { + fn new( + block_root: Hash256, + block_slot: Slot, + requester_id: SamplingRequester, + sampling_config: &SamplingConfig, + log: slog::Logger, + ) -> Self { + // Select ahead of time the full list of to-sample columns + let mut column_shuffle = (0..64).collect::>(); + let mut rng = thread_rng(); + column_shuffle.shuffle(&mut rng); + + Self { + block_root, + block_slot, + requester_id, + column_requests: <_>::default(), + column_shuffle, + required_successes: match sampling_config { + SamplingConfig::Default => REQUIRED_SUCCESSES.to_vec(), + SamplingConfig::Custom { required_successes } => required_successes.clone(), + }, + log, + _phantom: PhantomData, + } + } + + // TODO: When is a fork and only a subset of your peers know about a block, sampling should only + // be queried on the peers on that fork. Should this case be handled? How to handle it? + fn on_sample_downloaded( + &mut self, + _peer_id: PeerId, + column_index: ColumnIndex, + resp: Result<(DataColumnSidecarList, Duration), LookupFailure>, + cx: &mut SyncNetworkContext, + ) -> Result, SamplingError> { + // Select columns to sample + // Create individual request per column + // Progress requests + // If request fails retry or expand search + // If all good return + let Some(request) = self.column_requests.get_mut(&column_index) else { + warn!( + self.log, + "Received sampling response for unrequested column index" + ); + return Ok(None); + }; + + match resp { + Ok((mut data_columns, seen_timestamp)) => { + debug!(self.log, "Sample download success"; "block_root" => %self.block_root, "column_index" => column_index, "count" => data_columns.len()); + + // No need to check data_columns has len > 1, as the SyncNetworkContext ensure that + // only requested is returned (or none); + if let Some(data_column) = data_columns.pop() { + // Peer has data column, send to verify + let Some(beacon_processor) = cx.beacon_processor_if_enabled() else { + // If processor is not available, error the entire sampling + debug!(self.log, "Dropping sampling"; "block" => %self.block_root, "reason" => "beacon processor unavailable"); + return Err(SamplingError::ProcessorUnavailable); + }; + + debug!(self.log, "Sending data_column for verification"; "block" => ?self.block_root, "column_index" => column_index); + if let Err(e) = beacon_processor.send_rpc_data_columns( + self.block_root, + vec![data_column], + seen_timestamp, + SamplingId { + id: self.requester_id, + column_index, + }, + ) { + // TODO(das): Beacon processor is overloaded, what should we do? + error!(self.log, "Dropping sampling"; "block" => %self.block_root, "reason" => e.to_string()); + return Err(SamplingError::SendFailed("beacon processor send failure")); + } + } else { + // Peer does not have the requested data. + // TODO(das) what to do? + debug!(self.log, "Sampling peer claims to not have the data"; "block_root" => %self.block_root, "column_index" => column_index); + request.on_sampling_error()?; + } + } + Err(err) => { + debug!(self.log, "Sample download error"; "block_root" => %self.block_root, "column_index" => column_index, "error" => %err); + + // Error downloading, maybe penalize peer and retry again. + // TODO(das) with different peer or different peer? + request.on_sampling_error()?; + } + }; + + self.continue_sampling(cx) + } + + pub(crate) fn on_sample_verified( + &mut self, + column_index: ColumnIndex, + result: Result<(), String>, + cx: &mut SyncNetworkContext, + ) -> Result, SamplingError> { + // Select columns to sample + // Create individual request per column + // Progress requests + // If request fails retry or expand search + // If all good return + let Some(request) = self.column_requests.get_mut(&column_index) else { + warn!( + self.log, + "Received sampling response for unrequested column index" + ); + return Ok(None); + }; + + match result { + Ok(_) => { + debug!(self.log, "Sample verification success"; "block_root" => %self.block_root, "column_index" => column_index); + + // Valid, continue_sampling will maybe consider sampling succees + request.on_sampling_success()?; + } + Err(err) => { + debug!(self.log, "Sample verification failure"; "block_root" => %self.block_root, "column_index" => column_index, "reason" => ?err); + + // TODO(das): Peer sent invalid data, penalize and try again from different peer + // TODO(das): Count individual failures + let peer_id = request.on_sampling_error()?; + cx.report_peer( + peer_id, + PeerAction::LowToleranceError, + "invalid data column", + ); + } + } + + self.continue_sampling(cx) + } + + fn continue_sampling( + &mut self, + cx: &mut SyncNetworkContext, + ) -> Result, SamplingError> { + // First check if sampling is completed, by computing `required_successes` + let mut successes = 0; + let mut failures = 0; + let mut ongoings = 0; + + for request in self.column_requests.values() { + if request.is_completed() { + successes += 1; + } + if request.is_failed() { + failures += 1; + } + if request.is_ongoing() { + ongoings += 1; + } + } + + // If there are too many failures, consider the sampling failed + let Some(required_successes) = self.required_successes.get(failures) else { + return Err(SamplingError::TooManyFailures); + }; + + // If there are enough successes, consider the sampling complete + if successes >= *required_successes { + return Ok(Some(())); + } + + let mut sent_requests = 0; + + // First, attempt to progress sampling by requesting more columns, so that request failures + // are accounted for below. + for idx in 0..*required_successes { + // Re-request columns + let column_index = self.column_shuffle[idx]; + let request = self + .column_requests + .entry(column_index) + .or_insert(ActiveColumnSampleRequest::new(column_index)); + + if request.request(self.block_root, self.block_slot, self.requester_id, cx)? { + sent_requests += 1 + } + } + + // Make sure that sampling doesn't stall, by ensuring that this sampling request will + // receive a new event of some type. If there are no ongoing requests, and no new + // request was sent, loop to increase the required_successes until the sampling fails if + // there are no peers. + if ongoings == 0 && sent_requests == 0 { + debug!(self.log, "Sampling request stalled"; "block_root" => %self.block_root); + } + + Ok(None) + } +} + +mod request { + use super::{SamplingError, SamplingId, SamplingRequester}; + use crate::sync::{ + manager::DataColumnsByRootRequester, + network_context::{DataColumnsByRootSingleBlockRequest, SyncNetworkContext}, + }; + use beacon_chain::BeaconChainTypes; + use lighthouse_network::PeerId; + use std::collections::HashSet; + use types::{data_column_sidecar::ColumnIndex, EthSpec, Hash256, Slot}; + + pub(crate) struct ActiveColumnSampleRequest { + column_index: ColumnIndex, + status: Status, + // TODO(das): Should downscore peers that claim to not have the sample? + #[allow(dead_code)] + peers_dont_have: HashSet, + } + + #[derive(Debug, Clone)] + enum Status { + NoPeers, + NotStarted, + Sampling(PeerId), + Verified, + } + + impl ActiveColumnSampleRequest { + pub(crate) fn new(column_index: ColumnIndex) -> Self { + Self { + column_index, + status: Status::NotStarted, + peers_dont_have: <_>::default(), + } + } + + pub(crate) fn is_completed(&self) -> bool { + match self.status { + Status::NoPeers | Status::NotStarted | Status::Sampling(_) => false, + Status::Verified => true, + } + } + + pub(crate) fn is_failed(&self) -> bool { + match self.status { + Status::NotStarted | Status::Sampling(_) | Status::Verified => false, + Status::NoPeers => true, + } + } + + pub(crate) fn is_ongoing(&self) -> bool { + match self.status { + Status::NotStarted | Status::NoPeers | Status::Verified => false, + Status::Sampling(_) => true, + } + } + + pub(crate) fn request( + &mut self, + block_root: Hash256, + block_slot: Slot, + requester: SamplingRequester, + cx: &mut SyncNetworkContext, + ) -> Result { + match &self.status { + Status::NoPeers | Status::NotStarted => {} // Ok to continue + Status::Sampling(_) => return Ok(false), // Already downloading + Status::Verified => return Ok(false), // Already completed + } + + let peer_ids = cx.get_custodial_peers( + block_slot.epoch(::slots_per_epoch()), + self.column_index, + ); + + // TODO(das) randomize custodial peer and avoid failing peers + if let Some(peer_id) = peer_ids.first().cloned() { + cx.data_column_lookup_request( + DataColumnsByRootRequester::Sampling(SamplingId { + id: requester, + column_index: self.column_index, + }), + peer_id, + DataColumnsByRootSingleBlockRequest { + block_root, + indices: vec![self.column_index], + }, + ) + .map_err(SamplingError::SendFailed)?; + + self.status = Status::Sampling(peer_id); + Ok(true) + } else { + self.status = Status::NoPeers; + Ok(false) + } + } + + pub(crate) fn on_sampling_error(&mut self) -> Result { + match self.status.clone() { + Status::Sampling(peer_id) => { + self.status = Status::NotStarted; + Ok(peer_id) + } + other => Err(SamplingError::BadState(format!( + "bad state on_sampling_error expected Sampling got {other:?}" + ))), + } + } + + pub(crate) fn on_sampling_success(&mut self) -> Result<(), SamplingError> { + match &self.status { + Status::Sampling(_) => { + self.status = Status::Verified; + Ok(()) + } + other => Err(SamplingError::BadState(format!( + "bad state on_sampling_success expected Sampling got {other:?}" + ))), + } + } + } +} diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index 885dc243cfd..a31ef887356 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -9,6 +9,7 @@ use bls::Signature; use derivative::Derivative; use kzg::{Blob as KzgBlob, Error as KzgError, Kzg}; use kzg::{KzgCommitment, KzgProof}; +use merkle_proof::MerkleTreeError; use safe_arith::ArithError; use serde::{Deserialize, Serialize}; use ssz::Encode; @@ -71,6 +72,12 @@ impl DataColumnSidecar { self.signed_block_header.message.tree_hash_root() } + /// Verifies the kzg commitment inclusion merkle proof. + pub fn verify_inclusion_proof(&self) -> Result { + // TODO(das): implement + Ok(true) + } + pub fn build_sidecars( blobs: &BlobSidecarList, block: &SignedBeaconBlock,