Skip to content

Commit

Permalink
Address some review comments and only send SamplingBlock sync messa…
Browse files Browse the repository at this point in the history
…ge after PEER_DAS_EPOCH.
  • Loading branch information
jimmygchen committed Apr 29, 2024
1 parent fccda79 commit 2127917
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 20 deletions.
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2860,6 +2860,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// be pruned on finalization, on a timeout or by a max count.
pub async fn process_sampling_completed(self: &Arc<Self>, block_root: Hash256) {
// TODO(das): update fork-choice
// NOTE: It is possible that sampling complets before block is imported into fork choice,
// in that case we may need to update availability cache.
// TODO(das): These log levels are too high, reduce once DAS matures
info!(self.log, "Sampling completed"; "block_root" => %block_root);
}
Expand Down
20 changes: 8 additions & 12 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,17 +639,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"index" => %index,
);

// We have observed a data column sidecar with valid inclusion proof, such that
// `block_root` must have data. The column may or not be imported yet.
// TODO(das): Sampling should check that sampling is not completed already.
//
// Trigger sampling early, potentially before processing the block. At this point column
// custodials may not have received all their columns. Triggering sampling so early is
// only viable with either:
// - Sync delaying sampling until some latter window
// - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569
self.send_sync_message(SyncMessage::SampleBlock(block_root, slot));

self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

// Log metrics to keep track of propagation delay times.
Expand Down Expand Up @@ -1281,7 +1270,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// either:
// - Sync delaying sampling until some latter window
// - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569
self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot()));
if self
.chain
.spec
.peer_das_epoch
.map_or(false, |peer_das_epoch| block.epoch() >= peer_das_epoch)
{
self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot()));
}
}

let result = self
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub enum RequestId {
SingleBlock { id: SingleLookupReqId },
/// Request searching for a set of blobs given a hash.
SingleBlob { id: SingleLookupReqId },
/// TODO
/// Request searching for a set of data columns given a hash and list of column indices.
DataColumnsByRoot(Id),
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
Expand Down
14 changes: 7 additions & 7 deletions beacon_node/network/src/sync/network_context/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,32 +195,32 @@ impl<E: EthSpec, T: Copy> ActiveDataColumnsByRootRequest<E, T> {
/// The active request SHOULD be dropped after `add_response` returns an error
pub fn add_response(
&mut self,
blob: Arc<DataColumnSidecar<E>>,
data_column: Arc<DataColumnSidecar<E>>,
) -> Result<Option<Vec<Arc<DataColumnSidecar<E>>>>, RPCError> {
if self.resolved {
return Err(RPCError::InvalidData("too many responses".to_string()));
}

let block_root = blob.block_root();
let block_root = data_column.block_root();
if self.request.block_root != block_root {
return Err(RPCError::InvalidData(format!(
"un-requested block root {block_root:?}"
)));
}
if !blob.verify_inclusion_proof().unwrap_or(false) {
if !data_column.verify_inclusion_proof().unwrap_or(false) {
return Err(RPCError::InvalidData("invalid inclusion proof".to_string()));
}
if !self.request.indices.contains(&blob.index) {
if !self.request.indices.contains(&data_column.index) {
return Err(RPCError::InvalidData(format!(
"un-requested index {}",
blob.index
data_column.index
)));
}
if self.items.iter().any(|b| b.index == blob.index) {
if self.items.iter().any(|b| b.index == data_column.index) {
return Err(RPCError::InvalidData("duplicated data".to_string()));
}

self.items.push(blob);
self.items.push(data_column);
if self.items.len() >= self.request.indices.len() {
// All expected chunks received, return result early
self.resolved = true;
Expand Down

0 comments on commit 2127917

Please sign in to comment.