diff --git a/Cargo.lock b/Cargo.lock index 9f68e6dcf7..bbe26f4bb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8282,6 +8282,7 @@ version = "0.2.0" dependencies = [ "beacon_chain", "bls", + "criterion", "db-key", "directory", "ethereum_ssz", @@ -8292,6 +8293,7 @@ dependencies = [ "lru", "metrics", "parking_lot 0.12.3", + "rand 0.8.5", "safe_arith", "serde", "slog", diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 61cde61916..cd7379b11d 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -7,6 +7,8 @@ edition = { workspace = true } [dev-dependencies] tempfile = { workspace = true } beacon_chain = { workspace = true } +criterion = { workspace = true } +rand = { workspace = true } [dependencies] db-key = "0.0.5" @@ -31,3 +33,7 @@ zstd = { workspace = true } bls = { workspace = true } smallvec = { workspace = true } logging = { workspace = true } + +[[bench]] +name = "hdiff" +harness = false diff --git a/beacon_node/store/benches/hdiff.rs b/beacon_node/store/benches/hdiff.rs new file mode 100644 index 0000000000..81bbbd11c3 --- /dev/null +++ b/beacon_node/store/benches/hdiff.rs @@ -0,0 +1,97 @@ +use bls::PublicKeyBytes; +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::Rng; +use ssz::Decode; +use store::{ + hdiff::{HDiff, HDiffBuffer}, + StoreConfig, +}; +use types::{BeaconState, Epoch, Eth1Data, EthSpec, ForkName, MainnetEthSpec as E, Validator}; + +pub fn all_benches(c: &mut Criterion) { + let fork = ForkName::Base; + let spec = fork.make_genesis_spec(E::default_spec()); + let genesis_time = 0; + let eth1_data = Eth1Data::default(); + let config = StoreConfig::default(); + let mut rng = rand::thread_rng(); + let validator_mutations = 1000; + let validator_additions = 100; + + for n in [1_000_000, 1_500_000, 2_000_000] { + let mut source_state = BeaconState::::new(genesis_time, eth1_data.clone(), &spec); + + for _ in 0..n { + append_validator(&mut source_state, &mut rng); + } + + let mut target_state = source_state.clone(); + // Change all balances + for i in 0..n { + let balance = target_state.balances_mut().get_mut(i).unwrap(); + *balance += rng.gen_range(1..=1_000_000); + } + // And some validator records + for _ in 0..validator_mutations { + let index = rng.gen_range(1..n); + // TODO: Only change a few things, and not the pubkey + *target_state.validators_mut().get_mut(index).unwrap() = rand_validator(&mut rng); + } + for _ in 0..validator_additions { + append_validator(&mut target_state, &mut rng); + } + + let source = HDiffBuffer::from_state(source_state); + let target = HDiffBuffer::from_state(target_state); + let diff = HDiff::compute(&source, &target, &config).unwrap(); + + c.bench_function( + &format!("apply hdiff n={n} v_mut={validator_mutations} v_add={validator_additions}"), + |b| { + let mut source = source.clone(); + b.iter(|| { + diff.apply(&mut source, &config).unwrap(); + println!("diff size {}", diff.size()); + }) + }, + ); + } +} + +fn rand_validator(mut rng: impl Rng) -> Validator { + let mut pubkey = [0u8; 48]; + rng.fill_bytes(&mut pubkey); + let withdrawal_credentials: [u8; 32] = rng.gen(); + + Validator { + pubkey: PublicKeyBytes::from_ssz_bytes(&pubkey).unwrap(), + withdrawal_credentials: withdrawal_credentials.into(), + slashed: false, + effective_balance: 32_000_000_000, + activation_eligibility_epoch: Epoch::max_value(), + activation_epoch: Epoch::max_value(), + exit_epoch: Epoch::max_value(), + withdrawable_epoch: Epoch::max_value(), + } +} + +fn append_validator(state: &mut BeaconState, mut rng: impl Rng) { + state + .balances_mut() + .push(32_000_000_000 + rng.gen_range(1..=1_000_000_000)) + .unwrap(); + if let Ok(inactivity_scores) = state.inactivity_scores_mut() { + inactivity_scores.push(0).unwrap(); + } + state + .validators_mut() + .push(rand_validator(&mut rng)) + .unwrap(); +} + +criterion_group! { + name = benches; + config = Criterion::default().sample_size(10); + targets = all_benches +} +criterion_main!(benches); diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index 49b29de28e..59f4747763 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -1,5 +1,6 @@ //! Hierarchical diff implementation. use crate::{metrics, DBColumn, StoreConfig, StoreItem}; +use bls::PublicKeyBytes; use itertools::Itertools; use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; @@ -7,7 +8,7 @@ use ssz_derive::{Decode, Encode}; use std::io::{Read, Write}; use std::ops::RangeInclusive; use std::str::FromStr; -use types::{BeaconState, ChainSpec, EthSpec, List, Slot}; +use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, List, Slot, Validator}; use zstd::{Decoder, Encoder}; #[derive(Debug)] @@ -81,6 +82,8 @@ pub enum StorageStrategy { pub struct HDiffBuffer { state: Vec, balances: Vec, + inactivity_scores: Vec, + validators: Vec, } /// Hierarchical state diff. @@ -101,6 +104,8 @@ pub struct HDiffBuffer { pub struct HDiff { state_diff: BytesDiff, balances_diff: CompressedU64Diff, + inactivity_scores_diff: CompressedU64Diff, + validators_diff: ValidatorsDiff, } #[derive(Debug, Encode, Decode)] @@ -113,30 +118,57 @@ pub struct CompressedU64Diff { bytes: Vec, } +#[derive(Debug, Encode, Decode)] +pub struct ValidatorsDiff { + bytes: Vec, +} + impl HDiffBuffer { pub fn from_state(mut beacon_state: BeaconState) -> Self { let _t = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_FROM_STATE_TIME); // Set state.balances to empty list, and then serialize state as ssz let balances_list = std::mem::take(beacon_state.balances_mut()); + let inactivity_scores = if let Ok(inactivity_scores) = beacon_state.inactivity_scores_mut() + { + std::mem::take(inactivity_scores).to_vec() + } else { + // TODO: What to set the diff list before altair? + vec![] + }; + let validators = std::mem::take(beacon_state.validators_mut()).to_vec(); let state = beacon_state.as_ssz_bytes(); let balances = balances_list.to_vec(); - HDiffBuffer { state, balances } + HDiffBuffer { + state, + balances, + inactivity_scores, + validators, + } } pub fn as_state(&self, spec: &ChainSpec) -> Result, Error> { let _t = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_INTO_STATE_TIME); let mut state = BeaconState::from_ssz_bytes(&self.state, spec).map_err(Error::InvalidSszState)?; + *state.balances_mut() = List::try_from_iter(self.balances.iter().copied()) .map_err(|_| Error::InvalidBalancesLength)?; + if let Ok(inactivity_scores) = state.inactivity_scores_mut() { + *inactivity_scores = List::try_from_iter(self.inactivity_scores.iter().copied()) + .map_err(|_| Error::InvalidBalancesLength)?; + } + Ok(state) } /// Byte size of this instance pub fn size(&self) -> usize { - self.state.len() + self.balances.len() * std::mem::size_of::() + self.state.len() + + self.balances.len() * std::mem::size_of::() + + self.inactivity_scores.len() * std::mem::size_of::() + + self.validators.len() * std::mem::size_of::() } } @@ -148,27 +180,39 @@ impl HDiff { ) -> Result { let state_diff = BytesDiff::compute(&source.state, &target.state)?; let balances_diff = CompressedU64Diff::compute(&source.balances, &target.balances, config)?; + let inactivity_scores_diff = CompressedU64Diff::compute( + &source.inactivity_scores, + &target.inactivity_scores, + config, + )?; + let validators_diff = + ValidatorsDiff::compute(&source.validators, &target.validators, config)?; Ok(Self { state_diff, balances_diff, + inactivity_scores_diff, + validators_diff, }) } pub fn apply(&self, source: &mut HDiffBuffer, config: &StoreConfig) -> Result<(), Error> { let source_state = std::mem::take(&mut source.state); self.state_diff.apply(&source_state, &mut source.state)?; - self.balances_diff.apply(&mut source.balances, config)?; - Ok(()) - } + self.inactivity_scores_diff + .apply(&mut source.inactivity_scores, config)?; + self.validators_diff.apply(&mut source.validators, config)?; - pub fn state_diff_len(&self) -> usize { - self.state_diff.bytes.len() + Ok(()) } - pub fn balances_diff_len(&self) -> usize { - self.balances_diff.bytes.len() + /// Byte size of this instance + pub fn size(&self) -> usize { + self.state_diff.size() + + self.balances_diff.size() + + self.inactivity_scores_diff.size() + + self.validators_diff.size() } } @@ -205,6 +249,11 @@ impl BytesDiff { *target = xdelta3::decode(&self.bytes, source).ok_or(Error::UnableToApplyDiff)?; Ok(()) } + + /// Byte size of this instance + pub fn size(&self) -> usize { + self.bytes.len() + } } impl CompressedU64Diff { @@ -223,29 +272,14 @@ impl CompressedU64Diff { }) .collect(); - let compression_level = config.compression_level; - let mut compressed_bytes = - Vec::with_capacity(config.estimate_compressed_size(uncompressed_bytes.len())); - let mut encoder = - Encoder::new(&mut compressed_bytes, compression_level).map_err(Error::Compression)?; - encoder - .write_all(&uncompressed_bytes) - .map_err(Error::Compression)?; - encoder.finish().map_err(Error::Compression)?; - Ok(CompressedU64Diff { - bytes: compressed_bytes, + bytes: compress_bytes(&uncompressed_bytes, config)?, }) } pub fn apply(&self, xs: &mut Vec, config: &StoreConfig) -> Result<(), Error> { // Decompress balances diff. - let mut balances_diff_bytes = - Vec::with_capacity(config.estimate_decompressed_size(self.bytes.len())); - let mut decoder = Decoder::new(&*self.bytes).map_err(Error::Compression)?; - decoder - .read_to_end(&mut balances_diff_bytes) - .map_err(Error::Compression)?; + let balances_diff_bytes = uncompress_bytes(&self.bytes, config)?; for (i, diff_bytes) in balances_diff_bytes .chunks(u64::BITS as usize / 8) @@ -265,6 +299,184 @@ impl CompressedU64Diff { Ok(()) } + + /// Byte size of this instance + pub fn size(&self) -> usize { + self.bytes.len() + } +} + +fn compress_bytes(input: &[u8], config: &StoreConfig) -> Result, Error> { + let compression_level = config.compression_level; + let mut out = Vec::with_capacity(config.estimate_compressed_size(input.len())); + let mut encoder = Encoder::new(&mut out, compression_level).map_err(Error::Compression)?; + encoder.write_all(input).map_err(Error::Compression)?; + encoder.finish().map_err(Error::Compression)?; + Ok(out) +} + +fn uncompress_bytes(input: &[u8], config: &StoreConfig) -> Result, Error> { + // Decompress balances diff. + let mut out = Vec::with_capacity(config.estimate_decompressed_size(input.len())); + let mut decoder = Decoder::new(input).map_err(Error::Compression)?; + decoder.read_to_end(&mut out).map_err(Error::Compression)?; + Ok(out) +} + +impl ValidatorsDiff { + pub fn compute( + xs: &[Validator], + ys: &[Validator], + config: &StoreConfig, + ) -> Result { + if xs.len() > ys.len() { + return Err(Error::U64DiffDeletionsNotSupported); + } + + let uncompressed_bytes = ys + .iter() + .enumerate() + .filter_map(|(i, y)| { + let validator_diff = if let Some(x) = xs.get(i) { + // How expensive are comparisions? + // TODO: Is it worth it to optimize the comparision and do them only once here? + if y == x { + return None; + } else { + let pubkey_changed = y.pubkey != x.pubkey; + // Note: If researchers attempt to change the Validator container, go quickly to + // All Core Devs and push hard to add another List in the BeaconState instead. + Validator { + // The pubkey can be changed on index re-use + pubkey: if pubkey_changed { + y.pubkey + } else { + PublicKeyBytes::empty() + }, + // withdrawal_credentials can be set to zero initially but can never be + // changed INTO zero. On index re-use it can be set to zero, but in that + // case the pubkey will also change. + withdrawal_credentials: if pubkey_changed + || y.withdrawal_credentials != x.withdrawal_credentials + { + y.withdrawal_credentials + } else { + Hash256::ZERO + }, + // effective_balance can increase and decrease + effective_balance: y.effective_balance - x.effective_balance, + // slashed can only change from false into true. In an index re-use it can + // switch back to false, but in that case the pubkey will also change. + slashed: y.slashed, + // activation_eligibility_epoch can never be zero under any case. It's + // set to either FAR_FUTURE_EPOCH or get_current_epoch(state) + 1 + activation_eligibility_epoch: if y.activation_eligibility_epoch + != x.activation_eligibility_epoch + { + y.activation_eligibility_epoch + } else { + Epoch::new(0) + }, + // activation_epoch can never be zero under any case. It's + // set to either FAR_FUTURE_EPOCH or epoch + 1 + MAX_SEED_LOOKAHEAD + activation_epoch: if y.activation_epoch != x.activation_epoch { + y.activation_epoch + } else { + Epoch::new(0) + }, + // exit_epoch can never be zero under any case. It's set to either + // FAR_FUTURE_EPOCH or > epoch + 1 + MAX_SEED_LOOKAHEAD + exit_epoch: if y.exit_epoch != x.exit_epoch { + y.exit_epoch + } else { + Epoch::new(0) + }, + // withdrawable_epoch can never be zero under any case. It's set to + // either FAR_FUTURE_EPOCH or > epoch + 1 + MAX_SEED_LOOKAHEAD + withdrawable_epoch: if y.withdrawable_epoch != x.withdrawable_epoch { + y.withdrawable_epoch + } else { + Epoch::new(0) + }, + } + } + } else { + y.clone() + }; + + Some(ValidatorDiffEntry { + index: i as u64, + validator_diff, + }) + }) + .flat_map(|v_diff| v_diff.as_ssz_bytes()) + .collect::>(); + + Ok(Self { + bytes: compress_bytes(&uncompressed_bytes, config)?, + }) + } + + pub fn apply(&self, xs: &mut Vec, config: &StoreConfig) -> Result<(), Error> { + let validator_diff_bytes = uncompress_bytes(&self.bytes, config)?; + + for diff_bytes in + validator_diff_bytes.chunks(::ssz_fixed_len()) + { + let ValidatorDiffEntry { + index, + validator_diff: diff, + } = ValidatorDiffEntry::from_ssz_bytes(diff_bytes) + .map_err(|_| Error::BalancesIncompleteChunk)?; + + if let Some(x) = xs.get_mut(index as usize) { + // TODO: Precompute empty + // Note: a pubkey change implies index re-use. In that case over-write + // withdrawal_credentials and slashed inconditionally as their default values + // are valid values. + let pubkey_changed = diff.pubkey != PublicKeyBytes::empty(); + if pubkey_changed { + x.pubkey = diff.pubkey; + } + if pubkey_changed || diff.withdrawal_credentials != Hash256::ZERO { + x.withdrawal_credentials = diff.withdrawal_credentials; + } + if diff.effective_balance != 0 { + x.effective_balance = x.effective_balance.wrapping_add(diff.effective_balance); + } + if pubkey_changed || diff.slashed { + x.slashed = diff.slashed; + } + if diff.activation_eligibility_epoch != Epoch::new(0) { + x.activation_eligibility_epoch = diff.activation_eligibility_epoch; + } + if diff.activation_epoch != Epoch::new(0) { + x.activation_epoch = diff.activation_epoch; + } + if diff.exit_epoch != Epoch::new(0) { + x.exit_epoch = diff.exit_epoch; + } + if diff.withdrawable_epoch != Epoch::new(0) { + x.withdrawable_epoch = diff.withdrawable_epoch; + } + } else { + xs.push(diff) + } + } + + Ok(()) + } + + /// Byte size of this instance + pub fn size(&self) -> usize { + self.bytes.len() + } +} + +#[derive(Debug, Encode, Decode)] +struct ValidatorDiffEntry { + index: u64, + validator_diff: Validator, } impl Default for HierarchyConfig { @@ -390,6 +602,7 @@ impl StorageStrategy { #[cfg(test)] mod tests { use super::*; + use rand::{thread_rng, Rng}; #[test] fn default_storage_strategy() { @@ -486,4 +699,40 @@ mod tests { // U64 diff wins by more than a factor of 3 assert!(u64_diff.bytes.len() < 3 * bytes_diff.bytes.len()); } + + #[test] + fn compressed_validators_diff() { + assert_eq!(::ssz_fixed_len(), 129); + + let mut rng = thread_rng(); + let config = &StoreConfig::default(); + let xs = (0..10) + .map(|_| rand_validator(&mut rng)) + .collect::>(); + let mut ys = xs.clone(); + ys[5] = rand_validator(&mut rng); + ys.push(rand_validator(&mut rng)); + let diff = ValidatorsDiff::compute(&xs, &ys, config).unwrap(); + + let mut xs_out = xs.clone(); + diff.apply(&mut xs_out, config).unwrap(); + assert_eq!(xs_out, ys); + } + + fn rand_validator(mut rng: impl Rng) -> Validator { + let mut pubkey = [0u8; 48]; + rng.fill_bytes(&mut pubkey); + let withdrawal_credentials: [u8; 32] = rng.gen(); + + Validator { + pubkey: PublicKeyBytes::from_ssz_bytes(&pubkey).unwrap(), + withdrawal_credentials: withdrawal_credentials.into(), + slashed: false, + effective_balance: 32_000_000_000, + activation_eligibility_epoch: Epoch::max_value(), + activation_epoch: Epoch::max_value(), + exit_epoch: Epoch::max_value(), + withdrawable_epoch: Epoch::max_value(), + } + } } diff --git a/consensus/types/src/validator.rs b/consensus/types/src/validator.rs index 8cf118eea5..275101ddbe 100644 --- a/consensus/types/src/validator.rs +++ b/consensus/types/src/validator.rs @@ -15,6 +15,7 @@ use tree_hash_derive::TreeHash; Debug, Clone, PartialEq, + Eq, Serialize, Deserialize, Encode,