Skip to content

Commit

Permalink
Data availability sampling on sync
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Apr 22, 2024
1 parent f9b4202 commit 828794d
Show file tree
Hide file tree
Showing 15 changed files with 1,225 additions and 77 deletions.
14 changes: 14 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ChainSegmentResult::Successful { imported_blocks }
}

pub async fn process_sampling_result(
self: &Arc<Self>,
block_root: Hash256,
sampling_successful: bool,
) {
// TODO(das): update fork-choice
// TODO(das): This log levels are too high, leave to debug important events for now
if sampling_successful {
info!(self.log, "Sampling successful"; "block_root" => %block_root);
} else {
warn!(self.log, "Sampling failed"; "block_root" => %block_root);
}
}

/// Returns `Ok(GossipVerifiedBlock)` if the supplied `block` should be forwarded onto the
/// gossip network. The block is not imported into the chain, it is just partially verified.
///
Expand Down
31 changes: 31 additions & 0 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2588,3 +2588,34 @@ pub fn generate_rand_block_and_blobs<E: EthSpec>(
}
(block, blob_sidecars)
}

pub fn generate_rand_block_and_data_columns<E: EthSpec>(
fork_name: ForkName,
num_blobs: NumBlobs,
rng: &mut impl Rng,
) -> (
SignedBeaconBlock<E, FullPayload<E>>,
Vec<DataColumnSidecar<E>>,
) {
let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng);
let blob = blobs.first().expect("should have at least 1 blob");

// TODO(das): do not hardcode
let data_columns = (0..64)
.map(|index| DataColumnSidecar {
index,
column: <_>::default(),
kzg_commitments: block
.message()
.body()
.blob_kzg_commitments()
.unwrap()
.clone(),
kzg_proofs: (vec![]).into(),
signed_block_header: blob.signed_block_header.clone(),
kzg_commitments_inclusion_proof: <_>::default(),
})
.collect::<Vec<_>>();

(block, data_columns)
}
25 changes: 22 additions & 3 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
/// will be stored before we start dropping them.
const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024;

/// TODO(das): Placeholder number
const MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN: usize = 1000;
const MAX_SAMPLING_RESULT_QUEUE_LEN: usize = 1000;

/// The maximum number of queued `Vec<SignedBeaconBlock>` objects received during syncing that will
/// be stored before we start dropping them.
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;
Expand Down Expand Up @@ -252,6 +256,8 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic
pub const RPC_BLOCK: &str = "rpc_block";
pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block";
pub const RPC_BLOBS: &str = "rpc_blob";
pub const RPC_VERIFY_DATA_COLUMNS: &str = "rpc_verify_data_columns";
pub const SAMPLING_RESULT: &str = "sampling_result";
pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
pub const STATUS_PROCESSING: &str = "status_processing";
Expand Down Expand Up @@ -629,6 +635,8 @@ pub enum Work<E: EthSpec> {
RpcBlobs {
process_fn: AsyncFn,
},
RpcVerifyDataColumn(AsyncFn),
SamplingResult(AsyncFn),
IgnoredRpcBlock {
process_fn: BlockingFn,
},
Expand Down Expand Up @@ -675,6 +683,8 @@ impl<E: EthSpec> Work<E> {
Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
Work::RpcBlock { .. } => RPC_BLOCK,
Work::RpcBlobs { .. } => RPC_BLOBS,
Work::RpcVerifyDataColumn(_) => RPC_VERIFY_DATA_COLUMNS,
Work::SamplingResult(_) => SAMPLING_RESULT,
Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK,
Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL,
Expand Down Expand Up @@ -833,6 +843,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN);
let mut rpc_verify_data_column_queue = FifoQueue::new(MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN);
let mut sampling_result_queue = FifoQueue::new(MAX_SAMPLING_RESULT_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
Expand Down Expand Up @@ -1278,6 +1290,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
rpc_block_queue.push(work, work_id, &self.log)
}
Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log),
Work::RpcVerifyDataColumn(_) => {
rpc_verify_data_column_queue.push(work, work_id, &self.log)
}
Work::SamplingResult(_) => {
sampling_result_queue.push(work, work_id, &self.log)
}
Work::ChainSegment { .. } => {
chain_segment_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1510,9 +1528,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
beacon_block_root: _,
process_fn,
} => task_spawner.spawn_async(process_fn),
Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => {
task_spawner.spawn_async(process_fn)
}
Work::RpcBlock { process_fn }
| Work::RpcBlobs { process_fn }
| Work::RpcVerifyDataColumn(process_fn)
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,14 @@ pub struct DataColumnsByRootRequest {
pub data_column_ids: RuntimeVariableList<DataColumnIdentifier>,
}

impl DataColumnsByRootRequest {
pub fn new(blob_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
let data_column_ids =
RuntimeVariableList::from_vec(blob_ids, spec.max_request_data_column_sidecars as usize);
Self { data_column_ids }
}
}

/* RPC Handling and Grouping */
// Collection of enums and structs used by the Codecs to encode/decode RPC messages

Expand Down
39 changes: 38 additions & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
service::NetworkMessage,
sync::{manager::BlockProcessType, SyncMessage},
sync::{manager::BlockProcessType, SamplingId, SyncMessage},
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain};
Expand Down Expand Up @@ -478,6 +478,43 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}

/// Create a new `Work` event for some data_columns from ReqResp
pub fn send_rpc_data_columns(
self: &Arc<Self>,
block_root: Hash256,
data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
seen_timestamp: Duration,
id: SamplingId,
) -> Result<(), Error<T::EthSpec>> {
let nbp = self.clone();
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcVerifyDataColumn(Box::pin(async move {
let result = nbp
.clone()
.validate_rpc_data_columns(block_root, data_columns, seen_timestamp)
.await;
// Sync handles these results
nbp.send_sync_message(SyncMessage::SampleVerified { id, result });
})),
})
}

pub fn send_sampling_result(
self: &Arc<Self>,
block_root: Hash256,
sampling_result: Result<(), String>,
) -> Result<(), Error<T::EthSpec>> {
let nbp = self.clone();
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::SamplingResult(Box::pin(async move {
nbp.process_sampling_result(block_root, sampling_result)
.await;
})),
})
}

/// Create a new work event to import `blocks` as a beacon chain segment.
pub fn send_chain_segment(
self: &Arc<Self>,
Expand Down
24 changes: 23 additions & 1 deletion beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use store::KzgCommitment;
use tokio::sync::mpsc;
use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{Epoch, Hash256};
use types::{DataColumnSidecar, Epoch, Hash256};

/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -305,6 +305,28 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}

/// Validate a list of data columns received from RPC requests
pub async fn validate_rpc_data_columns(
self: Arc<NetworkBeaconProcessor<T>>,
_block_root: Hash256,
_data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
_seen_timestamp: Duration,
) -> Result<(), String> {
// TODO(das): validate data column sidecar KZG commitment
Ok(())
}

/// Process a sampling result, inserting it into fork-choice
pub async fn process_sampling_result(
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
sampling_result: Result<(), String>,
) {
self.chain
.process_sampling_result(block_root, sampling_result.is_ok())
.await;
}

/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
pub async fn process_chain_segment(
Expand Down
22 changes: 7 additions & 15 deletions beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,11 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
SyncId::SingleBlock { .. } | SyncId::SingleBlob { .. } => {
crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id);
id @ SyncId::RangeBlockAndBlobs { .. } => id,
other => {
crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other);
return;
}
id @ SyncId::RangeBlockAndBlobs { .. } => id,
},
RequestId::Router => {
crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id);
Expand Down Expand Up @@ -577,12 +577,8 @@ impl<T: BeaconChainTypes> Router<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ SyncId::SingleBlock { .. } => id,
SyncId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id);
return;
}
SyncId::SingleBlob { .. } => {
crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id);
other => {
crit!(self.log, "BlocksByRoot response on incorrect request"; "request" => ?other);
return;
}
},
Expand Down Expand Up @@ -615,12 +611,8 @@ impl<T: BeaconChainTypes> Router<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ SyncId::SingleBlob { .. } => id,
SyncId::SingleBlock { .. } => {
crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id);
return;
}
SyncId::RangeBlockAndBlobs { .. } => {
crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id);
other => {
crit!(self.log, "BlobsByRoot response on incorrect request"; "request" => ?other);
return;
}
},
Expand Down
34 changes: 2 additions & 32 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
error: RPCError,
) {
// Downscore peer even if lookup is not known
self.downscore_on_rpc_error(peer_id, &error, cx);
cx.report_peer_on_rpc_error(peer_id, &error);

let Some(mut parent_lookup) = self.get_parent_lookup::<R>(id) else {
debug!(self.log,
Expand Down Expand Up @@ -674,7 +674,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
error: RPCError,
) {
// Downscore peer even if lookup is not known
self.downscore_on_rpc_error(peer_id, &error, cx);
cx.report_peer_on_rpc_error(peer_id, &error);

let log = self.log.clone();
let Some(mut lookup) = self.get_single_lookup::<R>(id) else {
Expand Down Expand Up @@ -1330,34 +1330,4 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.parent_lookups.len() as i64,
);
}

pub fn downscore_on_rpc_error(
&self,
peer_id: &PeerId,
error: &RPCError,
cx: &SyncNetworkContext<T>,
) {
// Note: logging the report event here with the full error display. The log inside
// `report_peer` only includes a smaller string, like "invalid_data"
debug!(self.log, "reporting peer for sync lookup error"; "error" => %error);
if let Some(action) = match error {
// Protocol errors are heavily penalized
RPCError::SSZDecodeError(..)
| RPCError::IoError(..)
| RPCError::ErrorResponse(..)
| RPCError::InvalidData(..)
| RPCError::HandlerRejected => Some(PeerAction::LowToleranceError),
// Timing / network errors are less penalized
// TODO: Is IoError a protocol error or network error?
RPCError::StreamTimeout | RPCError::IncompleteStream | RPCError::NegotiationTimeout => {
Some(PeerAction::MidToleranceError)
}
// Not supporting a specific protocol is tolerated. TODO: Are you sure?
RPCError::UnsupportedProtocol => None,
// Our fault, don't penalize peer
RPCError::InternalError(..) | RPCError::Disconnected => None,
} {
cx.report_peer(*peer_id, action, error.into());
}
}
}
Loading

0 comments on commit 828794d

Please sign in to comment.