Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plumbing for PeerDAS supernodes (#5050, #5409, #5570, #5966) #6216

Merged
merged 9 commits into from
Aug 12, 2024
19 changes: 17 additions & 2 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
kzg: Option<Arc<Kzg>>,
task_executor: Option<TaskExecutor>,
validator_monitor_config: Option<ValidatorMonitorConfig>,
import_all_data_columns: bool,
}

impl<TSlotClock, TEth1Backend, E, THotStore, TColdStore>
Expand Down Expand Up @@ -145,6 +146,7 @@ where
kzg: None,
task_executor: None,
validator_monitor_config: None,
import_all_data_columns: false,
}
}

Expand Down Expand Up @@ -615,6 +617,12 @@ where
self
}

/// Sets whether to require and import all data columns when importing block.
pub fn import_all_data_columns(mut self, import_all_data_columns: bool) -> Self {
self.import_all_data_columns = import_all_data_columns;
self
}

/// Sets the `BeaconChain` event handler backend.
///
/// For example, provide `ServerSentEventHandler` as a `handler`.
Expand Down Expand Up @@ -965,8 +973,15 @@ where
validator_monitor: RwLock::new(validator_monitor),
genesis_backfill_slot,
data_availability_checker: Arc::new(
DataAvailabilityChecker::new(slot_clock, self.kzg.clone(), store, &log, self.spec)
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
DataAvailabilityChecker::new(
slot_clock,
self.kzg.clone(),
store,
self.import_all_data_columns,
&log,
self.spec,
)
.map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?,
),
kzg: self.kzg.clone(),
};
Expand Down
11 changes: 8 additions & 3 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
slot_clock: T::SlotClock,
kzg: Option<Arc<Kzg>>,
store: BeaconStore<T>,
import_all_data_columns: bool,
log: &Logger,
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
// TODO(das): support supernode or custom custody requirement
let custody_subnet_count = spec.custody_requirement as usize;
let custody_subnet_count = if import_all_data_columns {
spec.data_column_sidecar_subnet_count as usize
} else {
spec.custody_requirement as usize
};

let custody_column_count =
custody_subnet_count.saturating_mul(spec.data_columns_per_subnet());

Expand All @@ -112,8 +117,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
Ok(Self {
availability_cache: Arc::new(overflow_cache),
slot_clock,
log: log.clone(),
kzg,
log: log.clone(),
spec,
})
}
Expand Down
1 change: 1 addition & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ where
.beacon_graffiti(beacon_graffiti)
.event_handler(event_handler)
.execution_layer(execution_layer)
.import_all_data_columns(config.network.subscribe_all_data_column_subnets)
.validator_monitor_config(config.validator_monitor.clone());

let builder = if let Some(slasher) = self.slasher.clone() {
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct Config {
pub network_dir: PathBuf,

/// IP addresses to listen on.
listen_addresses: ListenAddress,
pub(crate) listen_addresses: ListenAddress,

/// The address to broadcast to peers about which address we are listening on. None indicates
/// that no discovery address has been set in the CLI args.
Expand Down Expand Up @@ -100,6 +100,9 @@ pub struct Config {
/// Attempt to construct external port mappings with UPnP.
pub upnp_enabled: bool,

/// Subscribe to all data column subnets for the duration of the runtime.
pub subscribe_all_data_column_subnets: bool,

/// Subscribe to all subnets for the duration of the runtime.
pub subscribe_all_subnets: bool,

Expand Down Expand Up @@ -338,6 +341,7 @@ impl Default for Config {
upnp_enabled: true,
network_load: 4,
private: false,
subscribe_all_data_column_subnets: false,
subscribe_all_subnets: false,
import_all_attestations: false,
shutdown_after_sync: false,
Expand Down
111 changes: 107 additions & 4 deletions beacon_node/lighthouse_network/src/discovery/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::fs::File;
use std::io::prelude::*;
use std::path::Path;
use std::str::FromStr;
use types::{EnrForkId, EthSpec};
use types::{ChainSpec, EnrForkId, EthSpec};

use super::enr_ext::{EnrExt, QUIC6_ENR_KEY, QUIC_ENR_KEY};

Expand All @@ -24,6 +24,8 @@ pub const ETH2_ENR_KEY: &str = "eth2";
pub const ATTESTATION_BITFIELD_ENR_KEY: &str = "attnets";
/// The ENR field specifying the sync committee subnet bitfield.
pub const SYNC_COMMITTEE_BITFIELD_ENR_KEY: &str = "syncnets";
/// The ENR field specifying the peerdas custody subnet count.
pub const PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY: &str = "csc";

/// Extension trait for ENR's within Eth2.
pub trait Eth2Enr {
Expand All @@ -35,6 +37,9 @@ pub trait Eth2Enr {
&self,
) -> Result<EnrSyncCommitteeBitfield<E>, &'static str>;

/// The peerdas custody subnet count associated with the ENR.
fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> u64;

fn eth2(&self) -> Result<EnrForkId, &'static str>;
}

Expand All @@ -59,6 +64,16 @@ impl Eth2Enr for Enr {
.map_err(|_| "Could not decode the ENR syncnets bitfield")
}

/// if the custody value is non-existent in the ENR, then we assume the minimum custody value
/// defined in the spec.
fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> u64 {
self.get_decodable::<u64>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
.and_then(|r| r.ok())
// If value supplied in ENR is invalid, fallback to `custody_requirement`
.filter(|csc| csc <= &spec.data_column_sidecar_subnet_count)
.unwrap_or(spec.custody_requirement)
}

fn eth2(&self) -> Result<EnrForkId, &'static str> {
let eth2_bytes = self.get(ETH2_ENR_KEY).ok_or("ENR has no eth2 field")?;

Expand Down Expand Up @@ -126,12 +141,13 @@ pub fn build_or_load_enr<E: EthSpec>(
config: &NetworkConfig,
enr_fork_id: &EnrForkId,
log: &slog::Logger,
spec: &ChainSpec,
) -> Result<Enr, String> {
// Build the local ENR.
// Note: Discovery should update the ENR record's IP to the external IP as seen by the
// majority of our peers, if the CLI doesn't expressly forbid it.
let enr_key = CombinedKey::from_libp2p(local_key)?;
let mut local_enr = build_enr::<E>(&enr_key, config, enr_fork_id)?;
let mut local_enr = build_enr::<E>(&enr_key, config, enr_fork_id, spec)?;

use_or_load_enr(&enr_key, &mut local_enr, config, log)?;
Ok(local_enr)
Expand All @@ -142,6 +158,7 @@ pub fn build_enr<E: EthSpec>(
enr_key: &CombinedKey,
config: &NetworkConfig,
enr_fork_id: &EnrForkId,
spec: &ChainSpec,
) -> Result<Enr, String> {
let mut builder = discv5::enr::Enr::builder();
let (maybe_ipv4_address, maybe_ipv6_address) = &config.enr_address;
Expand Down Expand Up @@ -221,6 +238,16 @@ pub fn build_enr<E: EthSpec>(

builder.add_value(SYNC_COMMITTEE_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes());

// only set `csc` if PeerDAS fork epoch has been scheduled
if spec.is_peer_das_scheduled() {
let custody_subnet_count = if config.subscribe_all_data_column_subnets {
spec.data_column_sidecar_subnet_count
} else {
spec.custody_requirement
};
builder.add_value(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, &custody_subnet_count);
}

builder
.build(enr_key)
.map_err(|e| format!("Could not build Local ENR: {:?}", e))
Expand All @@ -244,10 +271,12 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool {
// take preference over disk udp port if one is not specified
&& (local_enr.udp4().is_none() || local_enr.udp4() == disk_enr.udp4())
&& (local_enr.udp6().is_none() || local_enr.udp6() == disk_enr.udp6())
// we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY key to match,
// otherwise we use a new ENR. This will likely only be true for non-validating nodes
// we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY and
// PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will
// likely only be true for non-validating nodes.
&& local_enr.get(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get(ATTESTATION_BITFIELD_ENR_KEY)
&& local_enr.get(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get(SYNC_COMMITTEE_BITFIELD_ENR_KEY)
&& local_enr.get(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
}

/// Loads enr from the given directory
Expand Down Expand Up @@ -280,3 +309,77 @@ pub fn save_enr_to_disk(dir: &Path, enr: &Enr, log: &slog::Logger) {
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::config::Config as NetworkConfig;
use types::{Epoch, MainnetEthSpec};

type E = MainnetEthSpec;

fn make_eip7594_spec() -> ChainSpec {
let mut spec = E::default_spec();
spec.eip7594_fork_epoch = Some(Epoch::new(10));
spec
}

#[test]
fn custody_subnet_count_default() {
let config = NetworkConfig {
subscribe_all_data_column_subnets: false,
..NetworkConfig::default()
};
let spec = make_eip7594_spec();

let enr = build_enr_with_config(config, &spec).0;

assert_eq!(
enr.custody_subnet_count::<E>(&spec),
spec.custody_requirement,
);
}

#[test]
fn custody_subnet_count_all() {
let config = NetworkConfig {
subscribe_all_data_column_subnets: true,
..NetworkConfig::default()
};
let spec = make_eip7594_spec();
let enr = build_enr_with_config(config, &spec).0;

assert_eq!(
enr.custody_subnet_count::<E>(&spec),
spec.data_column_sidecar_subnet_count,
);
}

#[test]
fn custody_subnet_count_fallback_default() {
let config = NetworkConfig::default();
let spec = make_eip7594_spec();
let (mut enr, enr_key) = build_enr_with_config(config, &spec);
let invalid_subnet_count = 99u64;

enr.insert(
PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY,
&invalid_subnet_count,
&enr_key,
)
.unwrap();

assert_eq!(
enr.custody_subnet_count::<E>(&spec),
spec.custody_requirement,
);
}

fn build_enr_with_config(config: NetworkConfig, spec: &ChainSpec) -> (Enr, CombinedKey) {
let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key = CombinedKey::from_secp256k1(&keypair);
let enr_fork_id = EnrForkId::default();
let enr = build_enr::<E>(&enr_key, &config, &enr_fork_id, spec).unwrap();
(enr, enr_key)
}
}
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ mod tests {
let mut config = NetworkConfig::default();
config.set_listening_addr(crate::ListenAddress::unused_v4_ports());
let enr_key: CombinedKey = CombinedKey::from_secp256k1(&keypair);
let enr: Enr = build_enr::<E>(&enr_key, &config, &EnrForkId::default()).unwrap();
let enr: Enr = build_enr::<E>(&enr_key, &config, &EnrForkId::default(), &spec).unwrap();
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new(
enr,
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl<E: EthSpec> Network<E> {
&config,
&ctx.enr_fork_id,
&log,
ctx.chain_spec,
)?;
// Construct the metadata
let meta_data = utils::load_or_build_metadata(&config.network_dir, &log);
Expand Down
Loading