Skip to content

Commit

Permalink
Data availability sampling on sync
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Apr 24, 2024
1 parent 254bb6e commit 9de3178
Show file tree
Hide file tree
Showing 14 changed files with 1,268 additions and 45 deletions.
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ChainSegmentResult::Successful { imported_blocks }
}

/// Updates fork-choice node into a permanent `available` state so it can become a viable head.
/// Only completed sampling results are received. Blocks are unavailable by default and should
/// be pruned on finalization, on a timeout or by a max count.
pub async fn process_sampling_completed(self: &Arc<Self>, block_root: Hash256) {
// TODO(das): update fork-choice
// TODO(das): These log levels are too high, reduce once DAS matures
info!(self.log, "Sampling completed"; "block_root" => %block_root);
}

/// Returns `Ok(GossipVerifiedBlock)` if the supplied `block` should be forwarded onto the
/// gossip network. The block is not imported into the chain, it is just partially verified.
///
Expand Down
30 changes: 30 additions & 0 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2584,3 +2584,33 @@ pub fn generate_rand_block_and_blobs<E: EthSpec>(
}
(block, blob_sidecars)
}

pub fn generate_rand_block_and_data_columns<E: EthSpec>(
fork_name: ForkName,
num_blobs: NumBlobs,
rng: &mut impl Rng,
) -> (
SignedBeaconBlock<E, FullPayload<E>>,
Vec<DataColumnSidecar<E>>,
) {
let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng);
let blob = blobs.first().expect("should have at least 1 blob");

let data_columns = (0..E::number_of_columns())
.map(|index| DataColumnSidecar {
index: index as u64,
column: <_>::default(),
kzg_commitments: block
.message()
.body()
.blob_kzg_commitments()
.unwrap()
.clone(),
kzg_proofs: (vec![]).into(),
signed_block_header: blob.signed_block_header.clone(),
kzg_commitments_inclusion_proof: <_>::default(),
})
.collect::<Vec<_>>();

(block, data_columns)
}
25 changes: 22 additions & 3 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
/// will be stored before we start dropping them.
const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024;

/// TODO(das): Placeholder number
const MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN: usize = 1000;
const MAX_SAMPLING_RESULT_QUEUE_LEN: usize = 1000;

/// The maximum number of queued `Vec<SignedBeaconBlock>` objects received during syncing that will
/// be stored before we start dropping them.
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;
Expand Down Expand Up @@ -252,6 +256,8 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic
pub const RPC_BLOCK: &str = "rpc_block";
pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block";
pub const RPC_BLOBS: &str = "rpc_blob";
pub const RPC_VERIFY_DATA_COLUMNS: &str = "rpc_verify_data_columns";
pub const SAMPLING_RESULT: &str = "sampling_result";
pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
pub const STATUS_PROCESSING: &str = "status_processing";
Expand Down Expand Up @@ -629,6 +635,8 @@ pub enum Work<E: EthSpec> {
RpcBlobs {
process_fn: AsyncFn,
},
RpcVerifyDataColumn(AsyncFn),
SamplingResult(AsyncFn),
IgnoredRpcBlock {
process_fn: BlockingFn,
},
Expand Down Expand Up @@ -675,6 +683,8 @@ impl<E: EthSpec> Work<E> {
Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
Work::RpcBlock { .. } => RPC_BLOCK,
Work::RpcBlobs { .. } => RPC_BLOBS,
Work::RpcVerifyDataColumn(_) => RPC_VERIFY_DATA_COLUMNS,
Work::SamplingResult(_) => SAMPLING_RESULT,
Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK,
Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL,
Expand Down Expand Up @@ -833,6 +843,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN);
let mut rpc_verify_data_column_queue = FifoQueue::new(MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN);
let mut sampling_result_queue = FifoQueue::new(MAX_SAMPLING_RESULT_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
Expand Down Expand Up @@ -1278,6 +1290,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
rpc_block_queue.push(work, work_id, &self.log)
}
Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log),
Work::RpcVerifyDataColumn(_) => {
rpc_verify_data_column_queue.push(work, work_id, &self.log)
}
Work::SamplingResult(_) => {
sampling_result_queue.push(work, work_id, &self.log)
}
Work::ChainSegment { .. } => {
chain_segment_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1510,9 +1528,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
beacon_block_root: _,
process_fn,
} => task_spawner.spawn_async(process_fn),
Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => {
task_spawner.spawn_async(process_fn)
}
Work::RpcBlock { process_fn }
| Work::RpcBlobs { process_fn }
| Work::RpcVerifyDataColumn(process_fn)
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,14 @@ pub struct DataColumnsByRootRequest {
pub data_column_ids: RuntimeVariableList<DataColumnIdentifier>,
}

impl DataColumnsByRootRequest {
pub fn new(blob_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
let data_column_ids =
RuntimeVariableList::from_vec(blob_ids, spec.max_request_data_column_sidecars as usize);
Self { data_column_ids }
}
}

/* RPC Handling and Grouping */
// Collection of enums and structs used by the Codecs to encode/decode RPC messages

Expand Down
38 changes: 37 additions & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
service::NetworkMessage,
sync::{manager::BlockProcessType, SyncMessage},
sync::{manager::BlockProcessType, SamplingId, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
Expand Down Expand Up @@ -478,6 +478,42 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}

/// Create a new `Work` event for some data_columns from ReqResp
pub fn send_rpc_data_columns(
self: &Arc<Self>,
block_root: Hash256,
data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
id: SamplingId,
) -> Result<(), Error<T::EthSpec>> {
let s = self.clone();
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcVerifyDataColumn(Box::pin(async move {
let result = s
.clone()
.validate_rpc_data_columns(block_root, data_columns, seen_timestamp)
.await;
// Sync handles these results
s.send_sync_message(SyncMessage::SampleVerified { id, result });
})),
})
}

/// Create a new `Work` event with a block sampling completed result
pub fn send_sampling_completed(
self: &Arc<Self>,
block_root: Hash256,
) -> Result<(), Error<T::EthSpec>> {
let nbp = self.clone();
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::SamplingResult(Box::pin(async move {
nbp.process_sampling_completed(block_root).await;
})),
})
}

/// Create a new work event to import `blocks` as a beacon chain segment.
pub fn send_chain_segment(
self: &Arc<Self>,
Expand Down
21 changes: 20 additions & 1 deletion beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use store::KzgCommitment;
use tokio::sync::mpsc;
use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{Epoch, Hash256};
use types::{DataColumnSidecar, Epoch, Hash256};

/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -305,6 +305,25 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}

/// Validate a list of data columns received from RPC requests
pub async fn validate_rpc_data_columns(
self: Arc<NetworkBeaconProcessor<T>>,
_block_root: Hash256,
_data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
_seen_timestamp: Duration,
) -> Result<(), String> {
// TODO(das): validate data column sidecar KZG commitment
Ok(())
}

/// Process a sampling completed event, inserting it into fork-choice
pub async fn process_sampling_completed(
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
) {
self.chain.process_sampling_completed(block_root).await;
}

/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
pub async fn process_chain_segment(
Expand Down
22 changes: 7 additions & 15 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,11 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } => {
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
id @ SyncId::RangeBlockAndBlobs { .. } => id,
other => {
crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other);
return;
}
id @ SyncId::RangeBlockAndBlobs { .. } => id,
},
RequestId::Router => {
crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id);
Expand Down Expand Up @@ -577,12 +577,8 @@ impl<T: BeaconChainTypes> Router<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ SyncId::SingleBlock { .. } => id,
SyncId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
SyncId::SingleBlob { .. } => {
crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id);
other => {
crit!(self.log, "BlocksByRoot response on incorrect request"; "request" => ?other);
return;
}
},
Expand Down Expand Up @@ -615,12 +611,8 @@ impl<T: BeaconChainTypes> Router<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ SyncId::SingleBlob { .. } => id,
SyncId::SingleBlock { .. } => {
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
return;
}
SyncId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id);
other => {
crit!(self.log, "BlobsByRoot response on incorrect request"; "request" => ?other);
return;
}
},
Expand Down
Loading

0 comments on commit 9de3178

Please sign in to comment.