Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Das custody reconstruct/multiple column requests #30

Draft
wants to merge 5 commits into
base: das
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,13 @@ impl TryInto<Hash256> for AvailabilityProcessingStatus {
/// The result of a chain segment processing.
pub enum ChainSegmentResult<E: EthSpec> {
/// Processing this chain segment finished successfully.
Successful { imported_blocks: usize },
Successful {
imported_blocks: Vec<(Hash256, Slot)>,
},
/// There was an error processing this chain segment. Before the error, some blocks could
/// have been imported.
Failed {
imported_blocks: usize,
imported_blocks: Vec<(Hash256, Slot)>,
error: BlockError<E>,
},
}
Expand Down Expand Up @@ -2709,7 +2711,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
chain_segment: Vec<RpcBlock<T::EthSpec>>,
) -> Result<Vec<HashBlockTuple<T::EthSpec>>, ChainSegmentResult<T::EthSpec>> {
// This function will never import any blocks.
let imported_blocks = 0;
let imported_blocks = vec![];
let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len());

// Produce a list of the parent root and slot of the child of each block.
Expand Down Expand Up @@ -2815,7 +2817,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
chain_segment: Vec<RpcBlock<T::EthSpec>>,
notify_execution_layer: NotifyExecutionLayer,
) -> ChainSegmentResult<T::EthSpec> {
let mut imported_blocks = 0;
let mut imported_blocks = vec![];

// Filter uninteresting blocks from the chain segment in a blocking task.
let chain = self.clone();
Expand Down Expand Up @@ -2875,6 +2877,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Import the blocks into the chain.
for signature_verified_block in signature_verified_blocks {
let block_slot = signature_verified_block.slot();

match self
.process_block(
signature_verified_block.block_root(),
Expand All @@ -2886,9 +2890,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
{
Ok(status) => {
match status {
AvailabilityProcessingStatus::Imported(_) => {
AvailabilityProcessingStatus::Imported(block_root) => {
// The block was imported successfully.
imported_blocks += 1;
imported_blocks.push((block_root, block_slot));
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
warn!(self.log, "Blobs missing in response to range request";
Expand Down Expand Up @@ -6823,6 +6827,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.data_availability_checker.data_availability_boundary()
}

/// Returns true if we should issue a sampling request for this block
/// TODO(das): check if the block is still within the da_window
pub fn should_sample_slot(&self, slot: Slot) -> bool {
self.spec
.eip7594_fork_epoch
.map_or(false, |eip7594_fork_epoch| {
slot.epoch(T::EthSpec::slots_per_epoch()) >= eip7594_fork_epoch
})
}

pub fn logger(&self) -> &Logger {
&self.log
}
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,10 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
pub fn block_root(&self) -> Hash256 {
self.block_root
}

pub fn slot(&self) -> Slot {
self.block.slot()
}
}

impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for SignatureVerifiedBlock<T> {
Expand Down
20 changes: 18 additions & 2 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
use crate::discovery::CombinedKey;
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId};
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use rand::seq::SliceRandom;
use score::{PeerAction, ReportSource, Score, ScoreState};
use slog::{crit, debug, error, trace, warn};
use ssz::Encode;
use std::net::IpAddr;
use std::time::Instant;
use std::{cmp::Ordering, fmt::Display};
Expand Down Expand Up @@ -673,9 +675,23 @@ impl<E: EthSpec> PeerDB<E> {
}

/// Updates the connection state. MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer_testing_only(&mut self, peer_id: &PeerId) -> Option<BanOperation> {
pub fn __add_connected_peer_testing_only(
&mut self,
peer_id: &PeerId,
supernode: bool,
) -> Option<BanOperation> {
let enr_key = CombinedKey::generate_secp256k1();
let enr = Enr::builder().build(&enr_key).unwrap();
let mut enr = Enr::builder().build(&enr_key).unwrap();

if supernode {
enr.insert(
PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY,
&(E::data_column_subnet_count() as u64).as_ssz_bytes(),
&enr_key,
)
.expect("u64 can be encoded");
}

self.update_connection_state(
peer_id,
NewConnectionState::Connected {
Expand Down
18 changes: 18 additions & 0 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! A collection of variables that are accessible outside of the network thread itself.
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV2};
use crate::types::{BackFillState, SyncState};
use crate::EnrExt;
use crate::{Client, Eth2Enr};
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock;
use ssz::Encode;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
use types::{DataColumnSubnetId, Epoch, EthSpec};
Expand Down Expand Up @@ -141,6 +143,22 @@ impl<E: EthSpec> NetworkGlobals<E> {
log,
)
}

/// TESTING ONLY. Set a custody_subnet_count value
pub fn test_mutate_custody_subnet_count(&mut self, value: u64) {
use crate::CombinedKeyExt;
// For test: use a random key. WARNING: changes ENR NodeID
let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key = discv5::enr::CombinedKey::from_secp256k1(&keypair);
self.local_enr
.write()
.insert(
PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY,
&value.as_ssz_bytes(),
&enr_key,
)
.expect("u64 can be serialized");
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1278,20 +1278,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;

// TODO(das) Might be too early to issue a request here. We haven't checked that the block
// actually includes blob transactions and thus has data. A peer could send a block is
// garbage commitments, and make us trigger sampling for a block that does not have data.
if block.num_expected_blobs() > 0 {
// Trigger sampling for block not yet execution valid. At this point column custodials are
// unlikely to have received their columns. Triggering sampling so early is only viable with
// either:
// - Sync delaying sampling until some latter window
// - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569
if self
.chain
.spec
.eip7594_fork_epoch
.map_or(false, |eip7594_fork_epoch| {
block.epoch() >= eip7594_fork_epoch
})
{
if self.chain.should_sample_slot(block.slot()) {
self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot()));
}
}
Expand Down
29 changes: 25 additions & 4 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};

let slot = block.slot();
let block_has_data = block.as_block().num_expected_blobs() > 0;
let parent_root = block.message().parent_root();
let commitments_formatted = block.as_block().commitments_formatted();

Expand All @@ -160,6 +161,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);

// RPC block imported or execution validated. If the block was already imported by gossip we
// receive Err(BlockError::AlreadyKnown).
if result.is_ok() &&
// Block has at least one blob, so it produced columns
block_has_data &&
// Block slot is within the DA boundary (should always be the case) and PeerDAS is activated
self.chain.should_sample_slot(slot)
{
self.send_sync_message(SyncMessage::SampleBlock(block_root, slot));
}

// RPC block imported, regardless of process type
if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result {
info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash);
Expand Down Expand Up @@ -491,21 +503,30 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
{
ChainSegmentResult::Successful { imported_blocks } => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL);
if imported_blocks > 0 {
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;

for (block_root, block_slot) in &imported_blocks {
if self.chain.should_sample_slot(*block_slot) {
self.send_sync_message(SyncMessage::SampleBlock(
*block_root,
*block_slot,
));
}
}
}
(imported_blocks, Ok(()))
(imported_blocks.len(), Ok(()))
}
ChainSegmentResult::Failed {
imported_blocks,
error,
} => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_FAILED_TOTAL);
let r = self.handle_failed_chain_segment(error);
if imported_blocks > 0 {
if !imported_blocks.is_empty() {
self.chain.recompute_head_at_current_slot().await;
}
(imported_blocks, r)
(imported_blocks.len(), r)
}
}
}
Expand Down
Loading
Loading