From aaeffccb5310261b4a95a9a8addfd66c875018b5 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 31 Oct 2024 19:13:22 +0200 Subject: [PATCH 1/4] Split hdiff computation --- Cargo.lock | 2 + beacon_node/store/Cargo.toml | 6 + beacon_node/store/benches/hdiff.rs | 97 +++++++++ beacon_node/store/src/hdiff.rs | 303 ++++++++++++++++++++++++++--- consensus/types/src/validator.rs | 1 + 5 files changed, 382 insertions(+), 27 deletions(-) create mode 100644 beacon_node/store/benches/hdiff.rs diff --git a/Cargo.lock b/Cargo.lock index 9f68e6dcf7..bbe26f4bb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8282,6 +8282,7 @@ version = "0.2.0" dependencies = [ "beacon_chain", "bls", + "criterion", "db-key", "directory", "ethereum_ssz", @@ -8292,6 +8293,7 @@ dependencies = [ "lru", "metrics", "parking_lot 0.12.3", + "rand 0.8.5", "safe_arith", "serde", "slog", diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 61cde61916..cd7379b11d 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -7,6 +7,8 @@ edition = { workspace = true } [dev-dependencies] tempfile = { workspace = true } beacon_chain = { workspace = true } +criterion = { workspace = true } +rand = { workspace = true } [dependencies] db-key = "0.0.5" @@ -31,3 +33,7 @@ zstd = { workspace = true } bls = { workspace = true } smallvec = { workspace = true } logging = { workspace = true } + +[[bench]] +name = "hdiff" +harness = false diff --git a/beacon_node/store/benches/hdiff.rs b/beacon_node/store/benches/hdiff.rs new file mode 100644 index 0000000000..81bbbd11c3 --- /dev/null +++ b/beacon_node/store/benches/hdiff.rs @@ -0,0 +1,97 @@ +use bls::PublicKeyBytes; +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::Rng; +use ssz::Decode; +use store::{ + hdiff::{HDiff, HDiffBuffer}, + StoreConfig, +}; +use types::{BeaconState, Epoch, Eth1Data, EthSpec, ForkName, MainnetEthSpec as E, Validator}; + +pub fn all_benches(c: &mut Criterion) { + let fork = ForkName::Base; + let spec = fork.make_genesis_spec(E::default_spec()); + let genesis_time = 0; + let eth1_data = Eth1Data::default(); + let config = StoreConfig::default(); + let mut rng = rand::thread_rng(); + let validator_mutations = 1000; + let validator_additions = 100; + + for n in [1_000_000, 1_500_000, 2_000_000] { + let mut source_state = BeaconState::::new(genesis_time, eth1_data.clone(), &spec); + + for _ in 0..n { + append_validator(&mut source_state, &mut rng); + } + + let mut target_state = source_state.clone(); + // Change all balances + for i in 0..n { + let balance = target_state.balances_mut().get_mut(i).unwrap(); + *balance += rng.gen_range(1..=1_000_000); + } + // And some validator records + for _ in 0..validator_mutations { + let index = rng.gen_range(1..n); + // TODO: Only change a few things, and not the pubkey + *target_state.validators_mut().get_mut(index).unwrap() = rand_validator(&mut rng); + } + for _ in 0..validator_additions { + append_validator(&mut target_state, &mut rng); + } + + let source = HDiffBuffer::from_state(source_state); + let target = HDiffBuffer::from_state(target_state); + let diff = HDiff::compute(&source, &target, &config).unwrap(); + + c.bench_function( + &format!("apply hdiff n={n} v_mut={validator_mutations} v_add={validator_additions}"), + |b| { + let mut source = source.clone(); + b.iter(|| { + diff.apply(&mut source, &config).unwrap(); + println!("diff size {}", diff.size()); + }) + }, + ); + } +} + +fn rand_validator(mut rng: impl Rng) -> Validator { + let mut pubkey = [0u8; 48]; + rng.fill_bytes(&mut pubkey); + let withdrawal_credentials: [u8; 32] = rng.gen(); + + Validator { + pubkey: PublicKeyBytes::from_ssz_bytes(&pubkey).unwrap(), + withdrawal_credentials: withdrawal_credentials.into(), + slashed: false, + effective_balance: 32_000_000_000, + activation_eligibility_epoch: Epoch::max_value(), + activation_epoch: Epoch::max_value(), + exit_epoch: Epoch::max_value(), + withdrawable_epoch: Epoch::max_value(), + } +} + +fn append_validator(state: &mut BeaconState, mut rng: impl Rng) { + state + .balances_mut() + .push(32_000_000_000 + rng.gen_range(1..=1_000_000_000)) + .unwrap(); + if let Ok(inactivity_scores) = state.inactivity_scores_mut() { + inactivity_scores.push(0).unwrap(); + } + state + .validators_mut() + .push(rand_validator(&mut rng)) + .unwrap(); +} + +criterion_group! { + name = benches; + config = Criterion::default().sample_size(10); + targets = all_benches +} +criterion_main!(benches); diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index 49b29de28e..59f4747763 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -1,5 +1,6 @@ //! Hierarchical diff implementation. use crate::{metrics, DBColumn, StoreConfig, StoreItem}; +use bls::PublicKeyBytes; use itertools::Itertools; use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; @@ -7,7 +8,7 @@ use ssz_derive::{Decode, Encode}; use std::io::{Read, Write}; use std::ops::RangeInclusive; use std::str::FromStr; -use types::{BeaconState, ChainSpec, EthSpec, List, Slot}; +use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, List, Slot, Validator}; use zstd::{Decoder, Encoder}; #[derive(Debug)] @@ -81,6 +82,8 @@ pub enum StorageStrategy { pub struct HDiffBuffer { state: Vec, balances: Vec, + inactivity_scores: Vec, + validators: Vec, } /// Hierarchical state diff. @@ -101,6 +104,8 @@ pub struct HDiffBuffer { pub struct HDiff { state_diff: BytesDiff, balances_diff: CompressedU64Diff, + inactivity_scores_diff: CompressedU64Diff, + validators_diff: ValidatorsDiff, } #[derive(Debug, Encode, Decode)] @@ -113,30 +118,57 @@ pub struct CompressedU64Diff { bytes: Vec, } +#[derive(Debug, Encode, Decode)] +pub struct ValidatorsDiff { + bytes: Vec, +} + impl HDiffBuffer { pub fn from_state(mut beacon_state: BeaconState) -> Self { let _t = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_FROM_STATE_TIME); // Set state.balances to empty list, and then serialize state as ssz let balances_list = std::mem::take(beacon_state.balances_mut()); + let inactivity_scores = if let Ok(inactivity_scores) = beacon_state.inactivity_scores_mut() + { + std::mem::take(inactivity_scores).to_vec() + } else { + // TODO: What to set the diff list before altair? + vec![] + }; + let validators = std::mem::take(beacon_state.validators_mut()).to_vec(); let state = beacon_state.as_ssz_bytes(); let balances = balances_list.to_vec(); - HDiffBuffer { state, balances } + HDiffBuffer { + state, + balances, + inactivity_scores, + validators, + } } pub fn as_state(&self, spec: &ChainSpec) -> Result, Error> { let _t = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_INTO_STATE_TIME); let mut state = BeaconState::from_ssz_bytes(&self.state, spec).map_err(Error::InvalidSszState)?; + *state.balances_mut() = List::try_from_iter(self.balances.iter().copied()) .map_err(|_| Error::InvalidBalancesLength)?; + if let Ok(inactivity_scores) = state.inactivity_scores_mut() { + *inactivity_scores = List::try_from_iter(self.inactivity_scores.iter().copied()) + .map_err(|_| Error::InvalidBalancesLength)?; + } + Ok(state) } /// Byte size of this instance pub fn size(&self) -> usize { - self.state.len() + self.balances.len() * std::mem::size_of::() + self.state.len() + + self.balances.len() * std::mem::size_of::() + + self.inactivity_scores.len() * std::mem::size_of::() + + self.validators.len() * std::mem::size_of::() } } @@ -148,27 +180,39 @@ impl HDiff { ) -> Result { let state_diff = BytesDiff::compute(&source.state, &target.state)?; let balances_diff = CompressedU64Diff::compute(&source.balances, &target.balances, config)?; + let inactivity_scores_diff = CompressedU64Diff::compute( + &source.inactivity_scores, + &target.inactivity_scores, + config, + )?; + let validators_diff = + ValidatorsDiff::compute(&source.validators, &target.validators, config)?; Ok(Self { state_diff, balances_diff, + inactivity_scores_diff, + validators_diff, }) } pub fn apply(&self, source: &mut HDiffBuffer, config: &StoreConfig) -> Result<(), Error> { let source_state = std::mem::take(&mut source.state); self.state_diff.apply(&source_state, &mut source.state)?; - self.balances_diff.apply(&mut source.balances, config)?; - Ok(()) - } + self.inactivity_scores_diff + .apply(&mut source.inactivity_scores, config)?; + self.validators_diff.apply(&mut source.validators, config)?; - pub fn state_diff_len(&self) -> usize { - self.state_diff.bytes.len() + Ok(()) } - pub fn balances_diff_len(&self) -> usize { - self.balances_diff.bytes.len() + /// Byte size of this instance + pub fn size(&self) -> usize { + self.state_diff.size() + + self.balances_diff.size() + + self.inactivity_scores_diff.size() + + self.validators_diff.size() } } @@ -205,6 +249,11 @@ impl BytesDiff { *target = xdelta3::decode(&self.bytes, source).ok_or(Error::UnableToApplyDiff)?; Ok(()) } + + /// Byte size of this instance + pub fn size(&self) -> usize { + self.bytes.len() + } } impl CompressedU64Diff { @@ -223,29 +272,14 @@ impl CompressedU64Diff { }) .collect(); - let compression_level = config.compression_level; - let mut compressed_bytes = - Vec::with_capacity(config.estimate_compressed_size(uncompressed_bytes.len())); - let mut encoder = - Encoder::new(&mut compressed_bytes, compression_level).map_err(Error::Compression)?; - encoder - .write_all(&uncompressed_bytes) - .map_err(Error::Compression)?; - encoder.finish().map_err(Error::Compression)?; - Ok(CompressedU64Diff { - bytes: compressed_bytes, + bytes: compress_bytes(&uncompressed_bytes, config)?, }) } pub fn apply(&self, xs: &mut Vec, config: &StoreConfig) -> Result<(), Error> { // Decompress balances diff. - let mut balances_diff_bytes = - Vec::with_capacity(config.estimate_decompressed_size(self.bytes.len())); - let mut decoder = Decoder::new(&*self.bytes).map_err(Error::Compression)?; - decoder - .read_to_end(&mut balances_diff_bytes) - .map_err(Error::Compression)?; + let balances_diff_bytes = uncompress_bytes(&self.bytes, config)?; for (i, diff_bytes) in balances_diff_bytes .chunks(u64::BITS as usize / 8) @@ -265,6 +299,184 @@ impl CompressedU64Diff { Ok(()) } + + /// Byte size of this instance + pub fn size(&self) -> usize { + self.bytes.len() + } +} + +fn compress_bytes(input: &[u8], config: &StoreConfig) -> Result, Error> { + let compression_level = config.compression_level; + let mut out = Vec::with_capacity(config.estimate_compressed_size(input.len())); + let mut encoder = Encoder::new(&mut out, compression_level).map_err(Error::Compression)?; + encoder.write_all(input).map_err(Error::Compression)?; + encoder.finish().map_err(Error::Compression)?; + Ok(out) +} + +fn uncompress_bytes(input: &[u8], config: &StoreConfig) -> Result, Error> { + // Decompress balances diff. + let mut out = Vec::with_capacity(config.estimate_decompressed_size(input.len())); + let mut decoder = Decoder::new(input).map_err(Error::Compression)?; + decoder.read_to_end(&mut out).map_err(Error::Compression)?; + Ok(out) +} + +impl ValidatorsDiff { + pub fn compute( + xs: &[Validator], + ys: &[Validator], + config: &StoreConfig, + ) -> Result { + if xs.len() > ys.len() { + return Err(Error::U64DiffDeletionsNotSupported); + } + + let uncompressed_bytes = ys + .iter() + .enumerate() + .filter_map(|(i, y)| { + let validator_diff = if let Some(x) = xs.get(i) { + // How expensive are comparisions? + // TODO: Is it worth it to optimize the comparision and do them only once here? + if y == x { + return None; + } else { + let pubkey_changed = y.pubkey != x.pubkey; + // Note: If researchers attempt to change the Validator container, go quickly to + // All Core Devs and push hard to add another List in the BeaconState instead. + Validator { + // The pubkey can be changed on index re-use + pubkey: if pubkey_changed { + y.pubkey + } else { + PublicKeyBytes::empty() + }, + // withdrawal_credentials can be set to zero initially but can never be + // changed INTO zero. On index re-use it can be set to zero, but in that + // case the pubkey will also change. + withdrawal_credentials: if pubkey_changed + || y.withdrawal_credentials != x.withdrawal_credentials + { + y.withdrawal_credentials + } else { + Hash256::ZERO + }, + // effective_balance can increase and decrease + effective_balance: y.effective_balance - x.effective_balance, + // slashed can only change from false into true. In an index re-use it can + // switch back to false, but in that case the pubkey will also change. + slashed: y.slashed, + // activation_eligibility_epoch can never be zero under any case. It's + // set to either FAR_FUTURE_EPOCH or get_current_epoch(state) + 1 + activation_eligibility_epoch: if y.activation_eligibility_epoch + != x.activation_eligibility_epoch + { + y.activation_eligibility_epoch + } else { + Epoch::new(0) + }, + // activation_epoch can never be zero under any case. It's + // set to either FAR_FUTURE_EPOCH or epoch + 1 + MAX_SEED_LOOKAHEAD + activation_epoch: if y.activation_epoch != x.activation_epoch { + y.activation_epoch + } else { + Epoch::new(0) + }, + // exit_epoch can never be zero under any case. It's set to either + // FAR_FUTURE_EPOCH or > epoch + 1 + MAX_SEED_LOOKAHEAD + exit_epoch: if y.exit_epoch != x.exit_epoch { + y.exit_epoch + } else { + Epoch::new(0) + }, + // withdrawable_epoch can never be zero under any case. It's set to + // either FAR_FUTURE_EPOCH or > epoch + 1 + MAX_SEED_LOOKAHEAD + withdrawable_epoch: if y.withdrawable_epoch != x.withdrawable_epoch { + y.withdrawable_epoch + } else { + Epoch::new(0) + }, + } + } + } else { + y.clone() + }; + + Some(ValidatorDiffEntry { + index: i as u64, + validator_diff, + }) + }) + .flat_map(|v_diff| v_diff.as_ssz_bytes()) + .collect::>(); + + Ok(Self { + bytes: compress_bytes(&uncompressed_bytes, config)?, + }) + } + + pub fn apply(&self, xs: &mut Vec, config: &StoreConfig) -> Result<(), Error> { + let validator_diff_bytes = uncompress_bytes(&self.bytes, config)?; + + for diff_bytes in + validator_diff_bytes.chunks(::ssz_fixed_len()) + { + let ValidatorDiffEntry { + index, + validator_diff: diff, + } = ValidatorDiffEntry::from_ssz_bytes(diff_bytes) + .map_err(|_| Error::BalancesIncompleteChunk)?; + + if let Some(x) = xs.get_mut(index as usize) { + // TODO: Precompute empty + // Note: a pubkey change implies index re-use. In that case over-write + // withdrawal_credentials and slashed inconditionally as their default values + // are valid values. + let pubkey_changed = diff.pubkey != PublicKeyBytes::empty(); + if pubkey_changed { + x.pubkey = diff.pubkey; + } + if pubkey_changed || diff.withdrawal_credentials != Hash256::ZERO { + x.withdrawal_credentials = diff.withdrawal_credentials; + } + if diff.effective_balance != 0 { + x.effective_balance = x.effective_balance.wrapping_add(diff.effective_balance); + } + if pubkey_changed || diff.slashed { + x.slashed = diff.slashed; + } + if diff.activation_eligibility_epoch != Epoch::new(0) { + x.activation_eligibility_epoch = diff.activation_eligibility_epoch; + } + if diff.activation_epoch != Epoch::new(0) { + x.activation_epoch = diff.activation_epoch; + } + if diff.exit_epoch != Epoch::new(0) { + x.exit_epoch = diff.exit_epoch; + } + if diff.withdrawable_epoch != Epoch::new(0) { + x.withdrawable_epoch = diff.withdrawable_epoch; + } + } else { + xs.push(diff) + } + } + + Ok(()) + } + + /// Byte size of this instance + pub fn size(&self) -> usize { + self.bytes.len() + } +} + +#[derive(Debug, Encode, Decode)] +struct ValidatorDiffEntry { + index: u64, + validator_diff: Validator, } impl Default for HierarchyConfig { @@ -390,6 +602,7 @@ impl StorageStrategy { #[cfg(test)] mod tests { use super::*; + use rand::{thread_rng, Rng}; #[test] fn default_storage_strategy() { @@ -486,4 +699,40 @@ mod tests { // U64 diff wins by more than a factor of 3 assert!(u64_diff.bytes.len() < 3 * bytes_diff.bytes.len()); } + + #[test] + fn compressed_validators_diff() { + assert_eq!(::ssz_fixed_len(), 129); + + let mut rng = thread_rng(); + let config = &StoreConfig::default(); + let xs = (0..10) + .map(|_| rand_validator(&mut rng)) + .collect::>(); + let mut ys = xs.clone(); + ys[5] = rand_validator(&mut rng); + ys.push(rand_validator(&mut rng)); + let diff = ValidatorsDiff::compute(&xs, &ys, config).unwrap(); + + let mut xs_out = xs.clone(); + diff.apply(&mut xs_out, config).unwrap(); + assert_eq!(xs_out, ys); + } + + fn rand_validator(mut rng: impl Rng) -> Validator { + let mut pubkey = [0u8; 48]; + rng.fill_bytes(&mut pubkey); + let withdrawal_credentials: [u8; 32] = rng.gen(); + + Validator { + pubkey: PublicKeyBytes::from_ssz_bytes(&pubkey).unwrap(), + withdrawal_credentials: withdrawal_credentials.into(), + slashed: false, + effective_balance: 32_000_000_000, + activation_eligibility_epoch: Epoch::max_value(), + activation_epoch: Epoch::max_value(), + exit_epoch: Epoch::max_value(), + withdrawable_epoch: Epoch::max_value(), + } + } } diff --git a/consensus/types/src/validator.rs b/consensus/types/src/validator.rs index 8cf118eea5..275101ddbe 100644 --- a/consensus/types/src/validator.rs +++ b/consensus/types/src/validator.rs @@ -15,6 +15,7 @@ use tree_hash_derive::TreeHash; Debug, Clone, PartialEq, + Eq, Serialize, Deserialize, Encode, From 7a61b1688b87fcb29b3e16a90739a9f6fbf24e61 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 1 Nov 2024 04:23:08 +0200 Subject: [PATCH 2/4] Dedicated logic for historical roots and summaries --- beacon_node/store/benches/hdiff.rs | 79 +++++++++++++++---- beacon_node/store/src/hdiff.rs | 96 ++++++++++++++++++++++- consensus/types/src/historical_summary.rs | 1 + 3 files changed, 155 insertions(+), 21 deletions(-) diff --git a/beacon_node/store/benches/hdiff.rs b/beacon_node/store/benches/hdiff.rs index 81bbbd11c3..db99293c9a 100644 --- a/beacon_node/store/benches/hdiff.rs +++ b/beacon_node/store/benches/hdiff.rs @@ -2,22 +2,32 @@ use bls::PublicKeyBytes; use criterion::{criterion_group, criterion_main, Criterion}; use rand::Rng; use ssz::Decode; +use std::fs; use store::{ hdiff::{HDiff, HDiffBuffer}, StoreConfig, }; -use types::{BeaconState, Epoch, Eth1Data, EthSpec, ForkName, MainnetEthSpec as E, Validator}; +use types::{BeaconState, ChainSpec, Epoch, Eth1Data, EthSpec, MainnetEthSpec as E, Validator}; pub fn all_benches(c: &mut Criterion) { - let fork = ForkName::Base; - let spec = fork.make_genesis_spec(E::default_spec()); + let spec = E::default_spec(); let genesis_time = 0; let eth1_data = Eth1Data::default(); - let config = StoreConfig::default(); let mut rng = rand::thread_rng(); let validator_mutations = 1000; let validator_additions = 100; + // TODO: Temp + if std::path::Path::new("/Users/lion/code/sigp/lighthouse/state_10300000.ssz").exists() { + bench_against_disk_states( + c, + "/Users/lion/code/sigp/lighthouse/state_10300000.ssz", + "/Users/lion/code/sigp/lighthouse/state_10300064.ssz", + &spec, + ); + return; + } + for n in [1_000_000, 1_500_000, 2_000_000] { let mut source_state = BeaconState::::new(genesis_time, eth1_data.clone(), &spec); @@ -41,23 +51,58 @@ pub fn all_benches(c: &mut Criterion) { append_validator(&mut target_state, &mut rng); } - let source = HDiffBuffer::from_state(source_state); - let target = HDiffBuffer::from_state(target_state); - let diff = HDiff::compute(&source, &target, &config).unwrap(); - - c.bench_function( - &format!("apply hdiff n={n} v_mut={validator_mutations} v_add={validator_additions}"), - |b| { - let mut source = source.clone(); - b.iter(|| { - diff.apply(&mut source, &config).unwrap(); - println!("diff size {}", diff.size()); - }) - }, + bench_against_states( + c, + source_state, + target_state, + &format!("n={n} v_mut={validator_mutations} v_add={validator_additions}"), ); } } +fn bench_against_disk_states( + c: &mut Criterion, + source_state_path: &str, + target_state_path: &str, + spec: &ChainSpec, +) { + let source_state = + BeaconState::::from_ssz_bytes(&fs::read(source_state_path).unwrap(), spec).unwrap(); + let target_state = + BeaconState::::from_ssz_bytes(&fs::read(target_state_path).unwrap(), spec).unwrap(); + bench_against_states( + c, + source_state, + target_state, + &format!("{source_state_path} - {target_state_path}"), + ); +} + +fn bench_against_states( + c: &mut Criterion, + source_state: BeaconState, + target_state: BeaconState, + id: &str, +) { + let config = StoreConfig::default(); + let source = HDiffBuffer::from_state(source_state); + let target = HDiffBuffer::from_state(target_state); + let diff = HDiff::compute(&source, &target, &config).unwrap(); + println!("diff size {id} {}", diff.size()); + + c.bench_function(&format!("compute hdiff {id}"), |b| { + b.iter(|| { + HDiff::compute(&source, &target, &config).unwrap(); + }) + }); + c.bench_function(&format!("apply hdiff {id}"), |b| { + let mut source = source.clone(); + b.iter(|| { + diff.apply(&mut source, &config).unwrap(); + }) + }); +} + fn rand_validator(mut rng: impl Rng) -> Validator { let mut pubkey = [0u8; 48]; rng.fill_bytes(&mut pubkey); diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index 59f4747763..7fde30861b 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -5,9 +5,11 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; +use std::cmp::Ordering; use std::io::{Read, Write}; use std::ops::RangeInclusive; use std::str::FromStr; +use types::historical_summary::HistoricalSummary; use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, List, Slot, Validator}; use zstd::{Decoder, Encoder}; @@ -84,6 +86,8 @@ pub struct HDiffBuffer { balances: Vec, inactivity_scores: Vec, validators: Vec, + historical_roots: Vec, + historical_summaries: Vec, } /// Hierarchical state diff. @@ -104,8 +108,26 @@ pub struct HDiffBuffer { pub struct HDiff { state_diff: BytesDiff, balances_diff: CompressedU64Diff, + /// inactivity_scores are small integers that change slowly epoch to epoch. And are 0 for all + /// participants unless there's non-finality. Computing the diff and compressing the result is + /// much faster than running them through a binary patch algorithm. In the default case where + /// all values are 0 it should also result in a tinny output. inactivity_scores_diff: CompressedU64Diff, + /// The validators array represents the vast majority of data in a BeaconState. Due to its big + /// size we have seen the performance of xdelta3 degreade. Comparing each entry of the + /// validators array manually significantly speed up the computation of the diff (+10x faster) + /// and result in the same minimal diff. As the `Validator` record is unlikely to change, + /// mantaining this extra complexity should be okay. validators_diff: ValidatorsDiff, + /// `historical_roots` is an unbounded forever growing (after Capella it's + /// historical_summaries) list of unique roots. This data is pure entropy so there's no point + /// in compressing it. As it's an append only list, the optimal diff + compression is just the + /// list of new entries. The size of `historical_roots` and `historical_summaries` in + /// non-trivial ~10 MB so throwing it to xdelta3 adds CPU cycles. With a bit of extra complexity + /// we can save those completely. + historical_roots: AppendOnlyDiff, + /// See historical_roots + historical_summaries: AppendOnlyDiff, } #[derive(Debug, Encode, Decode)] @@ -123,6 +145,11 @@ pub struct ValidatorsDiff { bytes: Vec, } +#[derive(Debug, Encode, Decode)] +pub struct AppendOnlyDiff { + values: Vec, +} + impl HDiffBuffer { pub fn from_state(mut beacon_state: BeaconState) -> Self { let _t = metrics::start_timer(&metrics::STORE_BEACON_HDIFF_BUFFER_FROM_STATE_TIME); @@ -136,6 +163,13 @@ impl HDiffBuffer { vec![] }; let validators = std::mem::take(beacon_state.validators_mut()).to_vec(); + let historical_roots = std::mem::take(beacon_state.historical_roots_mut()).to_vec(); + let historical_summaries = + if let Ok(historical_summaries) = beacon_state.historical_summaries_mut() { + std::mem::take(historical_summaries).to_vec() + } else { + vec![] + }; let state = beacon_state.as_ssz_bytes(); let balances = balances_list.to_vec(); @@ -145,6 +179,8 @@ impl HDiffBuffer { balances, inactivity_scores, validators, + historical_roots, + historical_summaries, } } @@ -155,11 +191,24 @@ impl HDiffBuffer { *state.balances_mut() = List::try_from_iter(self.balances.iter().copied()) .map_err(|_| Error::InvalidBalancesLength)?; + if let Ok(inactivity_scores) = state.inactivity_scores_mut() { *inactivity_scores = List::try_from_iter(self.inactivity_scores.iter().copied()) .map_err(|_| Error::InvalidBalancesLength)?; } + // TODO: Can we remove all this clone / copying? + *state.validators_mut() = List::try_from_iter(self.validators.iter().cloned()) + .map_err(|_| Error::InvalidBalancesLength)?; + + *state.historical_roots_mut() = List::try_from_iter(self.historical_roots.iter().copied()) + .map_err(|_| Error::InvalidBalancesLength)?; + + if let Ok(historical_summaries) = state.historical_summaries_mut() { + *historical_summaries = List::try_from_iter(self.historical_summaries.iter().copied()) + .map_err(|_| Error::InvalidBalancesLength)?; + } + Ok(state) } @@ -187,12 +236,18 @@ impl HDiff { )?; let validators_diff = ValidatorsDiff::compute(&source.validators, &target.validators, config)?; + let historical_roots = + AppendOnlyDiff::compute(&source.historical_roots, &target.historical_roots)?; + let historical_summaries = + AppendOnlyDiff::compute(&source.historical_summaries, &target.historical_summaries)?; Ok(Self { state_diff, balances_diff, inactivity_scores_diff, validators_diff, + historical_roots, + historical_summaries, }) } @@ -203,16 +258,27 @@ impl HDiff { self.inactivity_scores_diff .apply(&mut source.inactivity_scores, config)?; self.validators_diff.apply(&mut source.validators, config)?; + self.historical_roots.apply(&mut source.historical_roots); + self.historical_summaries + .apply(&mut source.historical_summaries); Ok(()) } /// Byte size of this instance pub fn size(&self) -> usize { - self.state_diff.size() - + self.balances_diff.size() - + self.inactivity_scores_diff.size() - + self.validators_diff.size() + self.sizes().iter().sum() + } + + pub fn sizes(&self) -> Vec { + vec![ + self.state_diff.size(), + self.balances_diff.size(), + self.inactivity_scores_diff.size(), + self.validators_diff.size(), + self.historical_roots.size(), + self.historical_summaries.size(), + ] } } @@ -479,6 +545,28 @@ struct ValidatorDiffEntry { validator_diff: Validator, } +impl AppendOnlyDiff { + pub fn compute(xs: &[T], ys: &[T]) -> Result { + match xs.len().cmp(&ys.len()) { + Ordering::Less => Ok(Self { + values: ys.iter().skip(xs.len()).copied().collect(), + }), + // Don't even create an iterator for this common case + Ordering::Equal => Ok(Self { values: vec![] }), + Ordering::Greater => Err(Error::U64DiffDeletionsNotSupported), + } + } + + pub fn apply(&self, xs: &mut Vec) { + xs.extend(self.values.iter().copied()); + } + + /// Byte size of this instance + pub fn size(&self) -> usize { + self.values.len() * size_of::() + } +} + impl Default for HierarchyConfig { fn default() -> Self { HierarchyConfig { diff --git a/consensus/types/src/historical_summary.rs b/consensus/types/src/historical_summary.rs index 76bb111ea2..8c82d52b81 100644 --- a/consensus/types/src/historical_summary.rs +++ b/consensus/types/src/historical_summary.rs @@ -15,6 +15,7 @@ use tree_hash_derive::TreeHash; #[derive( Debug, PartialEq, + Eq, Serialize, Deserialize, Encode, From 04321573d10589cbd08995114fc6ea062cd56877 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 1 Nov 2024 05:49:43 +0200 Subject: [PATCH 3/4] Benchmark against real states --- beacon_node/store/benches/hdiff.rs | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/beacon_node/store/benches/hdiff.rs b/beacon_node/store/benches/hdiff.rs index db99293c9a..5d2e54b403 100644 --- a/beacon_node/store/benches/hdiff.rs +++ b/beacon_node/store/benches/hdiff.rs @@ -25,6 +25,24 @@ pub fn all_benches(c: &mut Criterion) { "/Users/lion/code/sigp/lighthouse/state_10300064.ssz", &spec, ); + bench_against_disk_states( + c, + "/Users/lion/code/sigp/lighthouse/state_10268064.ssz", + "/Users/lion/code/sigp/lighthouse/state_10300064.ssz", + &spec, + ); + bench_against_disk_states( + c, + "/Users/lion/code/sigp/lighthouse/state_9980064.ssz", + "/Users/lion/code/sigp/lighthouse/state_10300064.ssz", + &spec, + ); + bench_against_disk_states( + c, + "/Users/lion/code/sigp/lighthouse/state_7100064.ssz", + "/Users/lion/code/sigp/lighthouse/state_10300064.ssz", + &spec, + ); return; } @@ -70,6 +88,7 @@ fn bench_against_disk_states( BeaconState::::from_ssz_bytes(&fs::read(source_state_path).unwrap(), spec).unwrap(); let target_state = BeaconState::::from_ssz_bytes(&fs::read(target_state_path).unwrap(), spec).unwrap(); + bench_against_states( c, source_state, @@ -84,11 +103,15 @@ fn bench_against_states( target_state: BeaconState, id: &str, ) { + let slot_diff = target_state.slot() - source_state.slot(); let config = StoreConfig::default(); let source = HDiffBuffer::from_state(source_state); let target = HDiffBuffer::from_state(target_state); let diff = HDiff::compute(&source, &target, &config).unwrap(); - println!("diff size {id} {}", diff.size()); + println!( + "state slot diff {slot_diff} - diff size {id} {}", + diff.size() + ); c.bench_function(&format!("compute hdiff {id}"), |b| { b.iter(|| { From b218b9ab9a4a0dd34b7d45681811f729792fa128 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 1 Nov 2024 06:18:59 +0200 Subject: [PATCH 4/4] Mutated source? --- beacon_node/store/benches/hdiff.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/store/benches/hdiff.rs b/beacon_node/store/benches/hdiff.rs index 5d2e54b403..5f6bbadd4c 100644 --- a/beacon_node/store/benches/hdiff.rs +++ b/beacon_node/store/benches/hdiff.rs @@ -119,8 +119,8 @@ fn bench_against_states( }) }); c.bench_function(&format!("apply hdiff {id}"), |b| { - let mut source = source.clone(); b.iter(|| { + let mut source = source.clone(); diff.apply(&mut source, &config).unwrap(); }) });