Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PeerDAS RPC import boilerplate #6238

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3033,6 +3033,41 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

/// Cache the columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_custody_columns(
self: &Arc<Self>,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let Ok((slot, block_root)) = custody_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
.unique()
.exactly_one()
else {
return Err(BlockError::InternalError(
"Columns should be from the same block".to_string(),
));
};

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its columns again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
}

// TODO(das): custody column SSE event

let r = self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
self.remove_notified(&block_root, r)
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
Expand Down Expand Up @@ -3369,6 +3404,47 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

/// Checks if the provided columns can make any cached blocks available, and imports immediately
/// if so, otherwise caches the columns in the data availability checker.
async fn check_rpc_custody_columns_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
let mut slashable_cache = self.observed_slashable.write();
// Assumes all items in custody_columns are for the same block_root
if let Some(column) = custody_columns.first() {
let header = &column.signed_block_header;
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header.clone());
}
}
}
}

// This slot value is purely informative for the consumers of
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
let availability = self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
custody_columns,
)?;

self.process_availability(slot, availability).await
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
///
/// An error is returned if the block was unable to be imported. It may be partially imported
Expand Down
36 changes: 35 additions & 1 deletion beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ mod error;
mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn};
use crate::data_column_verification::{
GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
};
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize;

Expand Down Expand Up @@ -181,6 +183,38 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_kzg_verified_blobs(block_root, epoch, verified_blobs)
}

/// Put a list of custody columns received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
#[allow(clippy::type_complexity)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[allow(clippy::type_complexity)]

i think this is not necessary

pub fn put_rpc_custody_columns(
&self,
block_root: Hash256,
epoch: Epoch,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};

// TODO(das): report which column is invalid for proper peer scoring
// TODO(das): batch KZG verification here
let verified_custody_columns = custody_columns
.iter()
.map(|column| {
Ok(KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::new(column.clone(), kzg)
.map_err(AvailabilityCheckError::Kzg)?,
))
})
.collect::<Result<Vec<_>, AvailabilityCheckError>>()?;

self.availability_cache.put_kzg_verified_data_columns(
block_root,
epoch,
verified_custody_columns,
)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
/// have a block cached, return the `Availability` variant triggering block import.
/// Otherwise cache the blob sidecar.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,6 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
}

// TODO(das): rpc code paths to be implemented.
#[allow(dead_code)]
pub fn put_kzg_verified_data_columns<
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
>(
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl<T: BeaconChainTypes> GossipVerifiedDataColumn<T> {
pub fn id(&self) -> DataColumnIdentifier {
DataColumnIdentifier {
block_root: self.block_root,
index: self.data_column.data_column_index(),
index: self.data_column.index(),
}
}

Expand Down Expand Up @@ -221,7 +221,7 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
self.data.clone()
}

pub fn data_column_index(&self) -> u64 {
pub fn index(&self) -> ColumnIndex {
self.data.index
}
}
Expand Down
17 changes: 14 additions & 3 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub struct BeaconProcessorQueueLengths {
unknown_light_client_update_queue: usize,
rpc_block_queue: usize,
rpc_blob_queue: usize,
rpc_custody_column_queue: usize,
chain_segment_queue: usize,
backfill_chain_segment: usize,
gossip_block_queue: usize,
Expand Down Expand Up @@ -163,6 +164,7 @@ impl BeaconProcessorQueueLengths {
unknown_light_client_update_queue: 128,
rpc_block_queue: 1024,
rpc_blob_queue: 1024,
rpc_custody_column_queue: 1024,
chain_segment_queue: 64,
backfill_chain_segment: 64,
gossip_block_queue: 1024,
Expand Down Expand Up @@ -228,6 +230,7 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic
pub const RPC_BLOCK: &str = "rpc_block";
pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block";
pub const RPC_BLOBS: &str = "rpc_blob";
pub const RPC_CUSTODY_COLUMN: &str = "rpc_custody_column";
pub const CHAIN_SEGMENT: &str = "chain_segment";
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
pub const STATUS_PROCESSING: &str = "status_processing";
Expand Down Expand Up @@ -606,6 +609,7 @@ pub enum Work<E: EthSpec> {
RpcBlobs {
process_fn: AsyncFn,
},
RpcCustodyColumn(AsyncFn),
IgnoredRpcBlock {
process_fn: BlockingFn,
},
Expand Down Expand Up @@ -653,6 +657,7 @@ impl<E: EthSpec> Work<E> {
Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
Work::RpcBlock { .. } => RPC_BLOCK,
Work::RpcBlobs { .. } => RPC_BLOBS,
Work::RpcCustodyColumn { .. } => RPC_CUSTODY_COLUMN,
Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK,
Work::ChainSegment { .. } => CHAIN_SEGMENT,
Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL,
Expand Down Expand Up @@ -815,6 +820,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
// Using a FIFO queue since blocks need to be imported sequentially.
let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue);
let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue);
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
Expand Down Expand Up @@ -970,6 +976,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = rpc_blob_queue.pop() {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = rpc_custody_column_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check delayed blocks before gossip blocks, the gossip blocks might rely
// on the delayed ones.
} else if let Some(item) = delayed_block_queue.pop() {
Expand Down Expand Up @@ -1262,6 +1270,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
rpc_block_queue.push(work, work_id, &self.log)
}
Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log),
Work::RpcCustodyColumn { .. } => {
rpc_custody_column_queue.push(work, work_id, &self.log)
}
Work::ChainSegment { .. } => {
chain_segment_queue.push(work, work_id, &self.log)
}
Expand Down Expand Up @@ -1497,9 +1508,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
beacon_block_root: _,
process_fn,
} => task_spawner.spawn_async(process_fn),
Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => {
task_spawner.spawn_async(process_fn)
}
Work::RpcBlock { process_fn }
| Work::RpcBlobs { process_fn }
| Work::RpcCustodyColumn(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
| Work::GossipBlobSidecar(work)
Expand Down
24 changes: 24 additions & 0 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,30 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}

/// Create a new `Work` event for some custody columns. `process_rpc_custody_columns` reports
/// the result back to sync.
pub fn send_rpc_custody_columns(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future PR touching sync will consume this function

self: &Arc<Self>,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Result<(), Error<T::EthSpec>> {
let s = self.clone();
self.try_send(BeaconWorkEvent {
drop_during_sync: false,
work: Work::RpcCustodyColumn(Box::pin(async move {
s.process_rpc_custody_columns(
block_root,
custody_columns,
seen_timestamp,
process_type,
)
.await;
})),
})
}

/// Create a new work event to import `blocks` as a beacon chain segment.
pub fn send_chain_segment(
self: &Arc<Self>,
Expand Down
56 changes: 55 additions & 1 deletion beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use store::KzgCommitment;
use tokio::sync::mpsc;
use types::beacon_block_body::format_kzg_commitments;
use types::blob_sidecar::FixedBlobSidecarList;
use types::BlockImportSource;
use types::{BlockImportSource, DataColumnSidecarList};
use types::{Epoch, Hash256};

/// Id associated to a batch processing request, either a sync batch or a parent lookup.
Expand Down Expand Up @@ -307,6 +307,60 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}

pub async fn process_rpc_custody_columns(
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
_seen_timestamp: Duration,
process_type: BlockProcessType,
) {
let result = self
.chain
.process_rpc_custody_columns(custody_columns)
.await;

match &result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(hash) => {
debug!(
self.log,
"Block components retrieved";
"result" => "imported block and custody columns",
"block_hash" => %hash,
);
self.chain.recompute_head_at_current_slot().await;
}
AvailabilityProcessingStatus::MissingComponents(_, _) => {
debug!(
self.log,
"Missing components over rpc";
"block_hash" => %block_root,
);
}
},
Err(BlockError::BlockIsAlreadyKnown(_)) => {
debug!(
self.log,
"Custody columns have already been imported";
"block_hash" => %block_root,
);
}
Err(e) => {
warn!(
self.log,
"Error when importing rpc custody columns";
"error" => ?e,
"block_hash" => %block_root,
);
}
}

self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: result.into(),
});
}

/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
/// thread if more blocks are needed to process it.
pub async fn process_chain_segment(
Expand Down