Skip to content

Commit

Permalink
DAS sampling on sync (#5616)
Browse files Browse the repository at this point in the history
* Data availability sampling on sync

* Address @jimmygchen review

* Trigger sampling

* Address some review comments and only send `SamplingBlock` sync message after PEER_DAS_EPOCH.

---------

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>
  • Loading branch information
dapplion and jimmygchen authored Apr 29, 2024
1 parent c5bab04 commit 75eab79
Show file tree
Hide file tree
Showing 17 changed files with 1,402 additions and 48 deletions.
11 changes: 11 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ChainSegmentResult::Successful { imported_blocks }
}

/// Updates fork-choice node into a permanent `available` state so it can become a viable head.
/// Only completed sampling results are received. Blocks are unavailable by default and should
/// be pruned on finalization, on a timeout or by a max count.
pub async fn process_sampling_completed(self: &Arc<Self>, block_root: Hash256) {
// TODO(das): update fork-choice
// NOTE: It is possible that sampling complets before block is imported into fork choice,
// in that case we may need to update availability cache.
// TODO(das): These log levels are too high, reduce once DAS matures
info!(self.log, "Sampling completed"; "block_root" => %block_root);
}

/// Returns `Ok(GossipVerifiedBlock)` if the supplied `block` should be forwarded onto the
/// gossip network. The block is not imported into the chain, it is just partially verified.
///
Expand Down
30 changes: 30 additions & 0 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2584,3 +2584,33 @@ pub fn generate_rand_block_and_blobs<E: EthSpec>(
}
(block, blob_sidecars)
}

pub fn generate_rand_block_and_data_columns<E: EthSpec>(
fork_name: ForkName,
num_blobs: NumBlobs,
rng: &mut impl Rng,
) -> (
SignedBeaconBlock<E, FullPayload<E>>,
Vec<DataColumnSidecar<E>>,
) {
let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng);
let blob = blobs.first().expect("should have at least 1 blob");

let data_columns = (0..E::number_of_columns())
.map(|index| DataColumnSidecar {
index: index as u64,
column: <_>::default(),
kzg_commitments: block
.message()
.body()
.blob_kzg_commitments()
.unwrap()
.clone(),
kzg_proofs: (vec![]).into(),
signed_block_header: blob.signed_block_header.clone(),
kzg_commitments_inclusion_proof: <_>::default(),
})
.collect::<Vec<_>>();

(block, data_columns)
}
38 changes: 35 additions & 3 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
/// will be stored before we start dropping them.
const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024;

/// TODO(das): Placeholder number
const MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN: usize = 1000;
const MAX_SAMPLING_RESULT_QUEUE_LEN: usize = 1000;

/// The maximum number of queued `Vec<SignedBeaconBlock>` objects received during syncing that will
/// be stored before we start dropping them.
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;
Expand Down Expand Up @@ -252,6 +256,8 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic
pub const RPC_BLOCK: &str = "rpc_block";
pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block";
pub const RPC_BLOBS: &str = "rpc_blob";
pub const RPC_VERIFY_DATA_COLUMNS: &str = "rpc_verify_data_columns";
pub const SAMPLING_RESULT: &str = "sampling_result";
pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
pub const STATUS_PROCESSING: &str = "status_processing";
Expand Down Expand Up @@ -629,6 +635,8 @@ pub enum Work<E: EthSpec> {
RpcBlobs {
process_fn: AsyncFn,
},
RpcVerifyDataColumn(AsyncFn),
SamplingResult(AsyncFn),
IgnoredRpcBlock {
process_fn: BlockingFn,
},
Expand Down Expand Up @@ -675,6 +683,8 @@ impl<E: EthSpec> Work<E> {
Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
Work::RpcBlock { .. } => RPC_BLOCK,
Work::RpcBlobs { .. } => RPC_BLOBS,
Work::RpcVerifyDataColumn(_) => RPC_VERIFY_DATA_COLUMNS,
Work::SamplingResult(_) => SAMPLING_RESULT,
Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK,
Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL,
Expand Down Expand Up @@ -833,6 +843,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN);
let mut rpc_verify_data_column_queue = FifoQueue::new(MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN);
let mut sampling_result_queue = FifoQueue::new(MAX_SAMPLING_RESULT_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
Expand Down Expand Up @@ -988,6 +1000,11 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = rpc_blob_queue.pop() {
self.spawn_worker(item, idle_tx);
// TODO(das): decide proper priorization for sampling columns
} else if let Some(item) = rpc_verify_data_column_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = sampling_result_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check delayed blocks before gossip blocks, the gossip blocks might rely
// on the delayed ones.
} else if let Some(item) = delayed_block_queue.pop() {
Expand Down Expand Up @@ -1278,6 +1295,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
rpc_block_queue.push(work, work_id, &self.log)
}
Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log),
Work::RpcVerifyDataColumn(_) => {
rpc_verify_data_column_queue.push(work, work_id, &self.log)
}
Work::SamplingResult(_) => {
sampling_result_queue.push(work, work_id, &self.log)
}
Work::ChainSegment { .. } => {
chain_segment_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1371,6 +1394,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
&metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL,
rpc_blob_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL,
rpc_verify_data_column_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL,
sampling_result_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
chain_segment_queue.len() as i64,
Expand Down Expand Up @@ -1510,9 +1541,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
beacon_block_root: _,
process_fn,
} => task_spawner.spawn_async(process_fn),
Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => {
task_spawner.spawn_async(process_fn)
}
Work::RpcBlock { process_fn }
| Work::RpcBlobs { process_fn }
| Work::RpcVerifyDataColumn(process_fn)
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/beacon_processor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ lazy_static::lazy_static! {
"beacon_processor_rpc_blob_queue_total",
"Count of blobs from the rpc waiting to be verified."
);
// Rpc verify data columns
pub static ref BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_rpc_verify_data_column_queue_total",
"Count of data columns from the rpc waiting to be verified."
);
// Sampling result
pub static ref BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_sampling_result_queue_total",
"Count of sampling results waiting to be processed."
);
// Chain segments.
pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_chain_segment_queue_total",
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ pub struct DataColumnsByRootRequest {
pub data_column_ids: RuntimeVariableList<DataColumnIdentifier>,
}

impl DataColumnsByRootRequest {
pub fn new(data_column_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
let data_column_ids = RuntimeVariableList::from_vec(
data_column_ids,
spec.max_request_data_column_sidecars as usize,
);
Self { data_column_ids }
}
}

/* RPC Handling and Grouping */
// Collection of enums and structs used by the Codecs to encode/decode RPC messages

Expand Down
29 changes: 29 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use std::sync::Arc;
use strum::IntoEnumIterator;
use types::EthSpec;

pub const SUCCESS: &str = "SUCCESS";
pub const FAILURE: &str = "FAILURE";

lazy_static! {

pub static ref BEACON_BLOCK_MESH_PEERS_PER_CLIENT: Result<IntGaugeVec> =
Expand Down Expand Up @@ -344,6 +347,25 @@ lazy_static! {
"beacon_processor_reprocessing_queue_sent_optimistic_updates",
"Number of queued light client optimistic updates where as matching block has been imported."
);

/*
* Sampling
*/
pub static ref SAMPLE_DOWNLOAD_RESULT: Result<IntCounterVec> = try_create_int_counter_vec(
"beacon_sampling_sample_verify_result_total",
"Total count of individual sample download results",
&["result"]
);
pub static ref SAMPLE_VERIFY_RESULT: Result<IntCounterVec> = try_create_int_counter_vec(
"beacon_sampling_sample_verify_result_total",
"Total count of individual sample verify results",
&["result"]
);
pub static ref SAMPLING_REQUEST_RESULT: Result<IntCounterVec> = try_create_int_counter_vec(
"beacon_sampling_request_result_total",
"Total count of sample request results",
&["result"]
);
}

pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) {
Expand All @@ -362,6 +384,13 @@ pub fn register_sync_committee_error(error: &SyncCommitteeError) {
inc_counter_vec(&GOSSIP_SYNC_COMMITTEE_ERRORS_PER_TYPE, &[error.as_ref()]);
}

pub fn from_result<T, E>(result: &std::result::Result<T, E>) -> &str {
match result {
Ok(_) => SUCCESS,
Err(_) => FAILURE,
}
}

pub fn update_gossip_metrics<E: EthSpec>(
gossipsub: &Gossipsub,
network_globals: &Arc<NetworkGlobals<E>>,
Expand Down
20 changes: 18 additions & 2 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
seen_duration: Duration,
) {
let slot = column_sidecar.slot();
let root = column_sidecar.block_root();
let block_root = column_sidecar.block_root();
let index = column_sidecar.index;
let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock);
// Log metrics to track delay from other nodes on the network.
Expand All @@ -635,7 +635,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.log,
"Successfully verified gossip data column sidecar";
"slot" => %slot,
"root" => %root,
"block_root" => %block_root,
"index" => %index,
);

Expand Down Expand Up @@ -1264,6 +1264,22 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;

if block.num_expected_blobs() > 0 {
// Trigger sampling for block not yet execution valid. At this point column custodials are
// unlikely to have received their columns. Triggering sampling so early is only viable with
// either:
// - Sync delaying sampling until some latter window
// - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569
if self
.chain
.spec
.peer_das_epoch
.map_or(false, |peer_das_epoch| block.epoch() >= peer_das_epoch)
{
self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot()));
}
}

let result = self
.chain
.process_block_with_early_caching(block_root, verified_block, NotifyExecutionLayer::Yes)
Expand Down
38 changes: 37 additions & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
service::NetworkMessage,
sync::{manager::BlockProcessType, SyncMessage},
sync::{manager::BlockProcessType, SamplingId, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
Expand Down Expand Up @@ -478,6 +478,42 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}

/// Create a new `Work` event for some data_columns from ReqResp
pub fn send_rpc_data_columns(
self: &Arc<Self>,
block_root: Hash256,
data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
id: SamplingId,
) -> Result<(), Error<T::EthSpec>> {
let s = self.clone();
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcVerifyDataColumn(Box::pin(async move {
let result = s
.clone()
.validate_rpc_data_columns(block_root, data_columns, seen_timestamp)
.await;
// Sync handles these results
s.send_sync_message(SyncMessage::SampleVerified { id, result });
})),
})
}

/// Create a new `Work` event with a block sampling completed result
pub fn send_sampling_completed(
self: &Arc<Self>,
block_root: Hash256,
) -> Result<(), Error<T::EthSpec>> {
let nbp = self.clone();
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::SamplingResult(Box::pin(async move {
nbp.process_sampling_completed(block_root).await;
})),
})
}

/// Create a new work event to import `blocks` as a beacon chain segment.
pub fn send_chain_segment(
self: &Arc<Self>,
Expand Down
21 changes: 20 additions & 1 deletion beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use store::KzgCommitment;
use tokio::sync::mpsc;
use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{Epoch, Hash256};
use types::{DataColumnSidecar, Epoch, Hash256};

/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -305,6 +305,25 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}

/// Validate a list of data columns received from RPC requests
pub async fn validate_rpc_data_columns(
self: Arc<NetworkBeaconProcessor<T>>,
_block_root: Hash256,
_data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
_seen_timestamp: Duration,
) -> Result<(), String> {
// TODO(das): validate data column sidecar KZG commitment
Ok(())
}

/// Process a sampling completed event, inserting it into fork-choice
pub async fn process_sampling_completed(
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
) {
self.chain.process_sampling_completed(block_root).await;
}

/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
pub async fn process_chain_segment(
Expand Down
Loading

0 comments on commit 75eab79

Please sign in to comment.