Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAS sampling on sync #5616

Merged
merged 5 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1396,11 +1396,11 @@ impl<E: EthSpec> BeaconProcessor<E> {
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL,
rpc_blob_queue.len() as i64,
rpc_verify_data_column_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL,
rpc_blob_queue.len() as i64,
sampling_result_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_QUEUE_TOTAL,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_processor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ lazy_static::lazy_static! {
);
// Sampling result
pub static ref BEACON_PROCESSOR_SAMPLING_RESULT_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_rpc_blob_queue_total",
"beacon_processor_sampling_result_queue_total",
"Count of sampling results waiting to be processed."
);
// Chain segments.
Expand Down
24 changes: 22 additions & 2 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
seen_duration: Duration,
) {
let slot = column_sidecar.slot();
let root = column_sidecar.block_root();
let block_root = column_sidecar.block_root();
let index = column_sidecar.index;
let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock);
// Log metrics to track delay from other nodes on the network.
Expand All @@ -635,10 +635,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.log,
"Successfully verified gossip data column sidecar";
"slot" => %slot,
"root" => %root,
"block_root" => %block_root,
"index" => %index,
);

// We have observed a data column sidecar with valid inclusion proof, such that
// `block_root` must have data. The column may or not be imported yet.
// TODO(das): Sampling should check that sampling is not completed already.
//
// Trigger sampling early, potentially before processing the block. At this point column
// custodials may not have received all their columns. Triggering sampling so early is
// only viable with either:
// - Sync delaying sampling until some latter window
// - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569
self.send_sync_message(SyncMessage::SampleBlock(block_root, slot));
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved

self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

// Log metrics to keep track of propagation delay times.
Expand Down Expand Up @@ -1264,6 +1275,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;

if block.num_expected_blobs() > 0 {
// Trigger sampling for block not yet execution valid. At this point column custodials are
// unlikely to have received their columns. Triggering sampling so early is only viable with
// either:
// - Sync delaying sampling until some latter window
// - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569
self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot()));
}

let result = self
.chain
.process_block_with_early_caching(block_root, verified_block, NotifyExecutionLayer::Yes)
Expand Down
7 changes: 6 additions & 1 deletion beacon_node/network/src/sync/sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ impl<T: BeaconChainTypes> Sampling<T> {
self.log.clone(),
)),
Entry::Occupied(_) => {
warn!(self.log, "Ignoring duplicate sampling request"; "id" => ?id);
// Sampling is triggered from multiple sources, duplicate sampling requests are
// likely (gossip block + gossip data column)
// TODO(das): Should track failed sampling request for some time? Otherwise there's
// a risk of a loop with multiple triggers creating the request, then failing,
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
// and repeat.
debug!(self.log, "Ignoring duplicate sampling request"; "id" => ?id);
return None;
}
};
Expand Down
Loading