Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAS sampling on sync #5616

Merged
merged 5 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ChainSegmentResult::Successful { imported_blocks }
}

/// Updates fork-choice node into a permanent `available` state so it can become a viable head.
/// Only completed sampling results are received. Blocks are unavailable by default and should
/// be pruned on finalization, on a timeout or by a max count.
pub async fn process_sampling_completed(self: &Arc<Self>, block_root: Hash256) {
// TODO(das): update fork-choice
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
// TODO(das): These log levels are too high, reduce once DAS matures
info!(self.log, "Sampling completed"; "block_root" => %block_root);
}

/// Returns `Ok(GossipVerifiedBlock)` if the supplied `block` should be forwarded onto the
/// gossip network. The block is not imported into the chain, it is just partially verified.
///
Expand Down
30 changes: 30 additions & 0 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2584,3 +2584,33 @@ pub fn generate_rand_block_and_blobs<E: EthSpec>(
}
(block, blob_sidecars)
}

pub fn generate_rand_block_and_data_columns<E: EthSpec>(
fork_name: ForkName,
num_blobs: NumBlobs,
rng: &mut impl Rng,
) -> (
SignedBeaconBlock<E, FullPayload<E>>,
Vec<DataColumnSidecar<E>>,
) {
let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng);
let blob = blobs.first().expect("should have at least 1 blob");

let data_columns = (0..E::number_of_columns())
.map(|index| DataColumnSidecar {
index: index as u64,
column: <_>::default(),
kzg_commitments: block
.message()
.body()
.blob_kzg_commitments()
.unwrap()
.clone(),
kzg_proofs: (vec![]).into(),
signed_block_header: blob.signed_block_header.clone(),
kzg_commitments_inclusion_proof: <_>::default(),
})
.collect::<Vec<_>>();

(block, data_columns)
}
38 changes: 35 additions & 3 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
/// will be stored before we start dropping them.
const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024;

/// TODO(das): Placeholder number
const MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN: usize = 1000;
const MAX_SAMPLING_RESULT_QUEUE_LEN: usize = 1000;

/// The maximum number of queued `Vec<SignedBeaconBlock>` objects received during syncing that will
/// be stored before we start dropping them.
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;
Expand Down Expand Up @@ -252,6 +256,8 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic
pub const RPC_BLOCK: &str = "rpc_block";
pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block";
pub const RPC_BLOBS: &str = "rpc_blob";
pub const RPC_VERIFY_DATA_COLUMNS: &str = "rpc_verify_data_columns";
pub const SAMPLING_RESULT: &str = "sampling_result";
pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
pub const STATUS_PROCESSING: &str = "status_processing";
Expand Down Expand Up @@ -629,6 +635,8 @@ pub enum Work<E: EthSpec> {
RpcBlobs {
process_fn: AsyncFn,
},
RpcVerifyDataColumn(AsyncFn),
SamplingResult(AsyncFn),
IgnoredRpcBlock {
process_fn: BlockingFn,
},
Expand Down Expand Up @@ -675,6 +683,8 @@ impl<E: EthSpec> Work<E> {
Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
Work::RpcBlock { .. } => RPC_BLOCK,
Work::RpcBlobs { .. } => RPC_BLOBS,
Work::RpcVerifyDataColumn(_) => RPC_VERIFY_DATA_COLUMNS,
Work::SamplingResult(_) => SAMPLING_RESULT,
Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK,
Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL,
Expand Down Expand Up @@ -833,6 +843,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN);
let mut rpc_verify_data_column_queue = FifoQueue::new(MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN);
let mut sampling_result_queue = FifoQueue::new(MAX_SAMPLING_RESULT_QUEUE_LEN);
dapplion marked this conversation as resolved.
Show resolved Hide resolved
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
Expand Down Expand Up @@ -988,6 +1000,11 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = rpc_blob_queue.pop() {
self.spawn_worker(item, idle_tx);
// TODO(das): decide proper priorization for sampling columns
} else if let Some(item) = rpc_verify_data_column_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = sampling_result_queue.pop() {
self.spawn_worker(item, idle_tx);
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
// Check delayed blocks before gossip blocks, the gossip blocks might rely
// on the delayed ones.
} else if let Some(item) = delayed_block_queue.pop() {
Expand Down Expand Up @@ -1278,6 +1295,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
rpc_block_queue.push(work, work_id, &self.log)
}
Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log),
Work::RpcVerifyDataColumn(_) => {
rpc_verify_data_column_queue.push(work, work_id, &self.log)
}
Work::SamplingResult(_) => {
sampling_result_queue.push(work, work_id, &self.log)
}
Work::ChainSegment { .. } => {
chain_segment_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1371,6 +1394,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
&metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL,
rpc_blob_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL,
rpc_blob_queue.len() as i64,
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL,
rpc_blob_queue.len() as i64,
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
chain_segment_queue.len() as i64,
Expand Down Expand Up @@ -1510,9 +1541,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
beacon_block_root: _,
process_fn,
} => task_spawner.spawn_async(process_fn),
Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => {
task_spawner.spawn_async(process_fn)
}
Work::RpcBlock { process_fn }
| Work::RpcBlobs { process_fn }
| Work::RpcVerifyDataColumn(process_fn)
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/beacon_processor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ lazy_static::lazy_static! {
"beacon_processor_rpc_blob_queue_total",
"Count of blobs from the rpc waiting to be verified."
);
// Rpc verify data columns
pub static ref BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_rpc_verify_data_column_queue_total",
"Count of data columns from the rpc waiting to be verified."
);
// Sampling result
pub static ref BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_rpc_blob_queue_total",
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
"Count of sampling results waiting to be processed."
);
// Chain segments.
pub static ref BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_chain_segment_queue_total",
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ pub struct DataColumnsByRootRequest {
pub data_column_ids: RuntimeVariableList<DataColumnIdentifier>,
}

impl DataColumnsByRootRequest {
pub fn new(data_column_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
let data_column_ids = RuntimeVariableList::from_vec(
data_column_ids,
spec.max_request_data_column_sidecars as usize,
);
Self { data_column_ids }
}
}

/* RPC Handling and Grouping */
// Collection of enums and structs used by the Codecs to encode/decode RPC messages

Expand Down
38 changes: 37 additions & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
service::NetworkMessage,
sync::{manager::BlockProcessType, SyncMessage},
sync::{manager::BlockProcessType, SamplingId, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
Expand Down Expand Up @@ -478,6 +478,42 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}

/// Create a new `Work` event for some data_columns from ReqResp
pub fn send_rpc_data_columns(
self: &Arc<Self>,
block_root: Hash256,
data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
id: SamplingId,
) -> Result<(), Error<T::EthSpec>> {
let s = self.clone();
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcVerifyDataColumn(Box::pin(async move {
let result = s
.clone()
.validate_rpc_data_columns(block_root, data_columns, seen_timestamp)
.await;
// Sync handles these results
s.send_sync_message(SyncMessage::SampleVerified { id, result });
})),
})
}

/// Create a new `Work` event with a block sampling completed result
pub fn send_sampling_completed(
self: &Arc<Self>,
block_root: Hash256,
) -> Result<(), Error<T::EthSpec>> {
let nbp = self.clone();
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::SamplingResult(Box::pin(async move {
nbp.process_sampling_completed(block_root).await;
})),
})
}

/// Create a new work event to import `blocks` as a beacon chain segment.
pub fn send_chain_segment(
self: &Arc<Self>,
Expand Down
21 changes: 20 additions & 1 deletion beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use store::KzgCommitment;
use tokio::sync::mpsc;
use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{Epoch, Hash256};
use types::{DataColumnSidecar, Epoch, Hash256};

/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -305,6 +305,25 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}

/// Validate a list of data columns received from RPC requests
pub async fn validate_rpc_data_columns(
self: Arc<NetworkBeaconProcessor<T>>,
_block_root: Hash256,
_data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
_seen_timestamp: Duration,
) -> Result<(), String> {
// TODO(das): validate data column sidecar KZG commitment
Ok(())
}

/// Process a sampling completed event, inserting it into fork-choice
pub async fn process_sampling_completed(
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
) {
self.chain.process_sampling_completed(block_root).await;
}

/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
pub async fn process_chain_segment(
Expand Down
22 changes: 7 additions & 15 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,11 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } => {
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
id @ SyncId::RangeBlockAndBlobs { .. } => id,
other => {
crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other);
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
return;
}
id @ SyncId::RangeBlockAndBlobs { .. } => id,
},
RequestId::Router => {
crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id);
Expand Down Expand Up @@ -577,12 +577,8 @@ impl<T: BeaconChainTypes> Router<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ SyncId::SingleBlock { .. } => id,
SyncId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
SyncId::SingleBlob { .. } => {
crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id);
other => {
crit!(self.log, "BlocksByRoot response on incorrect request"; "request" => ?other);
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
return;
}
},
Expand Down Expand Up @@ -615,12 +611,8 @@ impl<T: BeaconChainTypes> Router<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ SyncId::SingleBlob { .. } => id,
SyncId::SingleBlock { .. } => {
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
return;
}
SyncId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id);
other => {
crit!(self.log, "BlobsByRoot response on incorrect request"; "request" => ?other);
return;
}
},
Expand Down
Loading