Skip to content

Commit

Permalink
Add subscribe-all-data-column-subnets and pass custody column count…
Browse files Browse the repository at this point in the history
… to `availability_cache`.
  • Loading branch information
jimmygchen committed Apr 16, 2024
1 parent cfab1a2 commit 898c249
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 29 deletions.
13 changes: 11 additions & 2 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
kzg: Option<Arc<Kzg>>,
task_executor: Option<TaskExecutor>,
validator_monitor_config: Option<ValidatorMonitorConfig>,
import_all_data_columns: bool,
}

impl<TSlotClock, TEth1Backend, E, THotStore, TColdStore>
Expand Down Expand Up @@ -145,6 +146,7 @@ where
kzg: None,
task_executor: None,
validator_monitor_config: None,
import_all_data_columns: false,
}
}

Expand Down Expand Up @@ -968,8 +970,15 @@ where
validator_monitor: RwLock::new(validator_monitor),
genesis_backfill_slot,
data_availability_checker: Arc::new(
DataAvailabilityChecker::new(slot_clock, self.kzg.clone(), store, &log, self.spec)
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
DataAvailabilityChecker::new(
slot_clock,
self.kzg.clone(),
store,
self.import_all_data_columns,
&log,
self.spec,
)
.map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?,
),
kzg: self.kzg.clone(),
block_production_state: Arc::new(Mutex::new(None)),
Expand Down
17 changes: 16 additions & 1 deletion beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,25 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
slot_clock: T::SlotClock,
kzg: Option<Arc<Kzg>>,
store: BeaconStore<T>,
import_all_data_columns: bool,
log: &Logger,
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
let overflow_cache = OverflowLRUCache::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?;
let custody_subnet_count = if import_all_data_columns {
E::data_column_subnet_count()
} else {
E::min_custody_requirement()
};

let custody_column_count =
custody_subnet_count.saturating_mul(E::data_columns_per_subnet());
let overflow_cache = OverflowLRUCache::new(
OVERFLOW_LRU_CAPACITY,
store,
custody_column_count,
spec.clone(),
)?;

Ok(Self {
availability_cache: Arc::new(overflow_cache),
slot_clock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
pub fn new(
capacity: NonZeroUsize,
beacon_store: BeaconStore<T>,
_custody_column_count: usize,
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
let overflow_store = OverflowStore(beacon_store.clone());
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ pub struct Config {
/// Attempt to construct external port mappings with UPnP.
pub upnp_enabled: bool,

/// Subscribe to all data column subnets for the duration of the runtime.
pub subscribe_all_data_column_subnets: bool,

/// Subscribe to all subnets for the duration of the runtime.
pub subscribe_all_subnets: bool,

Expand Down Expand Up @@ -338,6 +341,7 @@ impl Default for Config {
upnp_enabled: true,
network_load: 4,
private: false,
subscribe_all_data_column_subnets: false,
subscribe_all_subnets: false,
import_all_attestations: false,
shutdown_after_sync: false,
Expand Down
7 changes: 5 additions & 2 deletions beacon_node/lighthouse_network/src/discovery/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,12 @@ pub fn build_enr<E: EthSpec>(
builder.add_value(SYNC_COMMITTEE_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes());

// set the "custody_subnet_count" field on our ENR
let custody_subnet_count = E::min_custody_requirement() as u64;
let custody_subnet_count = if config.subscribe_all_data_column_subnets {
E::data_column_subnet_count() as u64
} else {
E::min_custody_requirement() as u64
};

// TODO(das) read from cli
builder.add_value(
PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY,
&custody_subnet_count.as_ssz_bytes(),
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the data column subnets.
subscribe_all_data_column_subnets: bool,
/// Subscribe to all the subnets once synced.
subscribe_all_subnets: bool,
/// Shutdown beacon node after sync is complete.
Expand Down Expand Up @@ -357,6 +359,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
next_fork_update,
next_fork_subscriptions,
next_unsubscribe,
subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets,
subscribe_all_subnets: config.subscribe_all_subnets,
shutdown_after_sync: config.shutdown_after_sync,
metrics_enabled: config.metrics_enabled,
Expand Down Expand Up @@ -733,7 +736,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

if !self.subscribe_all_subnets {
if !self.subscribe_all_data_column_subnets {
for column_subnet in DataColumnSubnetId::compute_custody_subnets::<T::EthSpec>(
self.network_globals.local_enr().node_id().raw().into(),
self.network_globals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ mod tests {
HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone())
.expect("store");
let da_checker = Arc::new(
DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec.clone())
DataAvailabilityChecker::new(slot_clock, None, store.into(), false, &log, spec.clone())
.expect("data availability checker"),
);
let mut sl = SingleBlockLookup::<TestLookup1, T>::new(
Expand Down Expand Up @@ -632,7 +632,7 @@ mod tests {
.expect("store");

let da_checker = Arc::new(
DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec.clone())
DataAvailabilityChecker::new(slot_clock, None, store.into(), false, &log, spec.clone())
.expect("data availability checker"),
);

Expand Down
8 changes: 8 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
/*
* Network parameters.
*/
.arg(
Arg::with_name("subscribe-all-data-column-subnets")
.long("subscribe-all-data-column-subnets")
.help("Subscribe to all data column subnets and participate in data custody for \
all columns. This will also advertise the beacon node as being long-lived \
subscribed to all data column subnets.")
.takes_value(false),
)
.arg(
Arg::with_name("subscribe-all-subnets")
.long("subscribe-all-subnets")
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,10 @@ pub fn set_network_config(
config.network_dir = data_dir.join(DEFAULT_NETWORK_DIR);
};

if cli_args.is_present("subscribe-all-data-column-subnets") {
config.subscribe_all_data_column_subnets = true;
}

if cli_args.is_present("subscribe-all-subnets") {
config.subscribe_all_subnets = true;
}
Expand Down
20 changes: 10 additions & 10 deletions consensus/types/src/data_column_subnet_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ impl DataColumnSubnetId {
}

#[allow(clippy::arithmetic_side_effects)]
pub fn columns<E: EthSpec>(&self) -> impl Iterator<Item = u64> {
pub fn columns<E: EthSpec>(&self) -> impl Iterator<Item = ColumnIndex> {
let subnet = self.0;
let data_column_subnet_count = E::data_column_subnet_count() as u64;
let columns_per_subnet = (E::number_of_columns() as u64) / data_column_subnet_count;
let columns_per_subnet = E::data_columns_per_subnet() as u64;
(0..columns_per_subnet).map(move |i| data_column_subnet_count * i + subnet)
}

Expand Down Expand Up @@ -151,8 +151,10 @@ impl From<ArithError> for Error {
#[cfg(test)]
mod test {
use crate::data_column_subnet_id::DataColumnSubnetId;
use crate::ChainSpec;
use crate::EthSpec;
use crate::{ChainSpec, MainnetEthSpec};

type E = MainnetEthSpec;

#[test]
fn test_compute_subnets_for_data_column() {
Expand All @@ -175,20 +177,18 @@ mod test {
let spec = ChainSpec::mainnet();

for node_id in node_ids {
let computed_subnets = DataColumnSubnetId::compute_custody_subnets::<
crate::MainnetEthSpec,
>(node_id, spec.custody_requirement);
let computed_subnets =
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, spec.custody_requirement);
let computed_subnets: Vec<_> = computed_subnets.collect();

// the number of subnets is equal to the custody requirement
assert_eq!(computed_subnets.len() as u64, spec.custody_requirement);

let subnet_count = crate::MainnetEthSpec::data_column_subnet_count();
let columns_per_subnet = crate::MainnetEthSpec::number_of_columns() / subnet_count;
let subnet_count = E::data_column_subnet_count();
for subnet in computed_subnets {
let columns: Vec<_> = subnet.columns::<crate::MainnetEthSpec>().collect();
let columns: Vec<_> = subnet.columns::<E>().collect();
// the number of columns is equal to the specified number of columns per subnet
assert_eq!(columns.len(), columns_per_subnet);
assert_eq!(columns.len(), E::data_columns_per_subnet());

for pair in columns.windows(2) {
// each successive column index is offset by the number of subnets
Expand Down
38 changes: 27 additions & 11 deletions consensus/types/src/eth_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub trait EthSpec:
type MinCustodyRequirement: Unsigned + Clone + Sync + Send + Debug + PartialEq;
type DataColumnSubnetCount: Unsigned + Clone + Sync + Send + Debug + PartialEq;
type DataColumnCount: Unsigned + Clone + Sync + Send + Debug + PartialEq;
type MaxBytesPerColumn: Unsigned + Clone + Sync + Send + Debug + PartialEq;
type DataColumnsPerSubnet: Unsigned + Clone + Sync + Send + Debug + PartialEq;
type KzgCommitmentsInclusionProofDepth: Unsigned + Clone + Sync + Send + Debug + PartialEq;
/*
* Derived values (set these CAREFULLY)
Expand Down Expand Up @@ -306,6 +306,10 @@ pub trait EthSpec:
Self::DataColumnCount::to_usize()
}

fn data_columns_per_subnet() -> usize {
Self::DataColumnsPerSubnet::to_usize()
}

fn min_custody_requirement() -> usize {
Self::MinCustodyRequirement::to_usize()
}
Expand All @@ -314,10 +318,6 @@ pub trait EthSpec:
Self::DataColumnSubnetCount::to_usize()
}

fn max_bytes_per_column() -> usize {
Self::MaxBytesPerColumn::to_usize()
}

fn kzg_commitments_inclusion_proof_depth() -> usize {
Self::KzgCommitmentsInclusionProofDepth::to_usize()
}
Expand Down Expand Up @@ -370,10 +370,7 @@ impl EthSpec for MainnetEthSpec {
type MinCustodyRequirement = U1;
type DataColumnSubnetCount = U32;
type DataColumnCount = U128;
// Column samples are entire columns in 1D DAS.
// max data size = extended_blob_bytes * max_blobs_per_block / num_of_columns
// 256kb * 32 / 128 = 64kb
type MaxBytesPerColumn = U65536;
type DataColumnsPerSubnet = U4;
type KzgCommitmentsInclusionProofDepth = U4; // inclusion of the whole list of commitments
type SyncSubcommitteeSize = U128; // 512 committee size / 4 sync committee subnet count
type MaxPendingAttestations = U4096; // 128 max attestations * 32 slots per epoch
Expand Down Expand Up @@ -415,7 +412,7 @@ impl EthSpec for MinimalEthSpec {
type MinCustodyRequirement = U1;
type DataColumnSubnetCount = U32;
type DataColumnCount = U128;
type MaxBytesPerColumn = U65536;
type DataColumnsPerSubnet = U4;
type KzgCommitmentsInclusionProofDepth = U4;

params_from_eth_spec!(MainnetEthSpec {
Expand Down Expand Up @@ -498,7 +495,7 @@ impl EthSpec for GnosisEthSpec {
type MinCustodyRequirement = U1;
type DataColumnSubnetCount = U32;
type DataColumnCount = U128;
type MaxBytesPerColumn = U65536;
type DataColumnsPerSubnet = U4;
type KzgCommitmentsInclusionProofDepth = U4;

fn default_spec() -> ChainSpec {
Expand All @@ -509,3 +506,22 @@ impl EthSpec for GnosisEthSpec {
EthSpecId::Gnosis
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_peerdas_config_all_specs() {
test_peerdas_config::<MainnetEthSpec>();
test_peerdas_config::<GnosisEthSpec>();
test_peerdas_config::<MinimalEthSpec>();
}

fn test_peerdas_config<E: EthSpec>() {
assert_eq!(
E::data_columns_per_subnet(),
E::number_of_columns() / E::data_column_subnet_count()
);
}
}
7 changes: 7 additions & 0 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,13 @@ fn network_target_peers_flag() {
});
}
#[test]
fn network_subscribe_all_data_column_subnets_flag() {
CommandLineTest::new()
.flag("subscribe-all-data-column-subnets", None)
.run_with_zero_port()
.with_config(|config| assert!(config.network.subscribe_all_data_column_subnets));
}
#[test]
fn network_subscribe_all_subnets_flag() {
CommandLineTest::new()
.flag("subscribe-all-subnets", None)
Expand Down

0 comments on commit 898c249

Please sign in to comment.