diff --git a/.cargo/config.toml b/.cargo/config.toml index dac01630032..a408305c4d1 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,4 +1,3 @@ [env] # Set the number of arenas to 16 when using jemalloc. JEMALLOC_SYS_WITH_MALLOC_CONF = "abort_conf:true,narenas:16" - diff --git a/Cargo.lock b/Cargo.lock index d76b1987806..8054c812f17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4355,37 +4355,6 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" -[[package]] -name = "jemalloc-ctl" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cffc705424a344c054e135d12ee591402f4539245e8bbd64e6c9eaa9458b63c" -dependencies = [ - "jemalloc-sys", - "libc", - "paste", -] - -[[package]] -name = "jemalloc-sys" -version = "0.5.4+5.3.0-patched" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac6c1946e1cea1788cbfde01c993b52a10e2da07f4bac608228d1bed20bfebf2" -dependencies = [ - "cc", - "libc", -] - -[[package]] -name = "jemallocator" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0de374a9f8e63150e6f5e8a60cc14c668226d7a347d8aee1a45766e3c4dd3bc" -dependencies = [ - "jemalloc-sys", - "libc", -] - [[package]] name = "jobserver" version = "0.1.32" @@ -5413,11 +5382,11 @@ dependencies = [ name = "malloc_utils" version = "0.1.0" dependencies = [ - "jemalloc-ctl", - "jemallocator", "libc", "lighthouse_metrics", "parking_lot 0.12.3", + "tikv-jemalloc-ctl", + "tikv-jemallocator", ] [[package]] @@ -8622,6 +8591,37 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "tikv-jemalloc-ctl" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b" +dependencies = [ + "libc", + "paste", + "tikv-jemalloc-sys", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.36" diff --git a/Makefile b/Makefile index d94c2df2613..e6420a4c984 100644 --- a/Makefile +++ b/Makefile @@ -207,6 +207,7 @@ lint: cargo clippy --workspace --tests $(EXTRA_CLIPPY_OPTS) --features "$(TEST_FEATURES)" -- \ -D clippy::fn_to_numeric_cast_any \ -D clippy::manual_let_else \ + -D clippy::large_stack_frames \ -D warnings \ -A clippy::derive_partial_eq_without_eq \ -A clippy::upper-case-acronyms \ diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 5a730719bfb..491271d6a9e 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -456,11 +456,10 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { chain: &BeaconChain, ) -> Result { Self::verify_slashable(signed_aggregate, chain) - .map(|verified_aggregate| { + .inspect(|verified_aggregate| { if let Some(slasher) = chain.slasher.as_ref() { slasher.accept_attestation(verified_aggregate.indexed_attestation.clone()); } - verified_aggregate }) .map_err(|slash_info| process_slash_info(slash_info, chain)) } @@ -892,11 +891,10 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { chain: &BeaconChain, ) -> Result { Self::verify_slashable(attestation.to_ref(), subnet_id, chain) - .map(|verified_unaggregated| { + .inspect(|verified_unaggregated| { if let Some(slasher) = chain.slasher.as_ref() { slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone()); } - verified_unaggregated }) .map_err(|slash_info| process_slash_info(slash_info, chain)) } diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index 0ce33f16891..ace5f0be74a 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -710,10 +710,8 @@ impl From for BeaconChainError { mod tests { use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches}; use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType}; - use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES}; - use execution_layer::EngineCapabilities; + use execution_layer::test_utils::Block; use std::sync::LazyLock; - use std::time::Duration; use tokio::sync::mpsc; use types::{ ChainSpec, Epoch, EthSpec, FixedBytesExtended, Hash256, Keypair, MinimalEthSpec, Slot, @@ -864,147 +862,4 @@ mod tests { } } } - - #[tokio::test] - async fn check_fallback_altair_to_electra() { - let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize; - let num_epochs = 10; - let bellatrix_fork_epoch = 2usize; - let capella_fork_epoch = 4usize; - let deneb_fork_epoch = 6usize; - let electra_fork_epoch = 8usize; - let num_blocks_produced = num_epochs * slots_per_epoch; - - let mut spec = test_spec::(); - spec.altair_fork_epoch = Some(Epoch::new(0)); - spec.bellatrix_fork_epoch = Some(Epoch::new(bellatrix_fork_epoch as u64)); - spec.capella_fork_epoch = Some(Epoch::new(capella_fork_epoch as u64)); - spec.deneb_fork_epoch = Some(Epoch::new(deneb_fork_epoch as u64)); - spec.electra_fork_epoch = Some(Epoch::new(electra_fork_epoch as u64)); - - let harness = get_harness(VALIDATOR_COUNT, spec); - - // modify execution engine so it doesn't support engine_payloadBodiesBy* methods - let mock_execution_layer = harness.mock_execution_layer.as_ref().unwrap(); - mock_execution_layer - .server - .set_engine_capabilities(EngineCapabilities { - get_payload_bodies_by_hash_v1: false, - get_payload_bodies_by_range_v1: false, - ..DEFAULT_ENGINE_CAPABILITIES - }); - // refresh capabilities cache - harness - .chain - .execution_layer - .as_ref() - .unwrap() - .get_engine_capabilities(Some(Duration::ZERO)) - .await - .unwrap(); - - // go to bellatrix fork - harness - .extend_slots(bellatrix_fork_epoch * slots_per_epoch) - .await; - // extend half an epoch - harness.extend_slots(slots_per_epoch / 2).await; - // trigger merge - harness - .execution_block_generator() - .move_to_terminal_block() - .expect("should move to terminal block"); - let timestamp = harness.get_timestamp_at_slot() + harness.spec.seconds_per_slot; - harness - .execution_block_generator() - .modify_last_block(|block| { - if let Block::PoW(terminal_block) = block { - terminal_block.timestamp = timestamp; - } - }); - // finish out merge epoch - harness.extend_slots(slots_per_epoch / 2).await; - // finish rest of epochs - harness - .extend_slots((num_epochs - 1 - bellatrix_fork_epoch) * slots_per_epoch) - .await; - - let head = harness.chain.head_snapshot(); - let state = &head.beacon_state; - - assert_eq!( - state.slot(), - Slot::new(num_blocks_produced as u64), - "head should be at the current slot" - ); - assert_eq!( - state.current_epoch(), - num_blocks_produced as u64 / MinimalEthSpec::slots_per_epoch(), - "head should be at the expected epoch" - ); - assert_eq!( - state.current_justified_checkpoint().epoch, - state.current_epoch() - 1, - "the head should be justified one behind the current epoch" - ); - assert_eq!( - state.finalized_checkpoint().epoch, - state.current_epoch() - 2, - "the head should be finalized two behind the current epoch" - ); - - let block_roots: Vec = harness - .chain - .forwards_iter_block_roots(Slot::new(0)) - .expect("should get iter") - .map(Result::unwrap) - .map(|(root, _)| root) - .collect(); - - let mut expected_blocks = vec![]; - // get all blocks the old fashioned way - for root in &block_roots { - let block = harness - .chain - .get_block(root) - .await - .expect("should get block") - .expect("block should exist"); - expected_blocks.push(block); - } - - for epoch in 0..num_epochs { - let start = epoch * slots_per_epoch; - let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; - epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); - let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No) - .expect("should create streamer"); - let (block_tx, mut block_rx) = mpsc::unbounded_channel(); - streamer.stream(epoch_roots.clone(), block_tx).await; - - for (i, expected_root) in epoch_roots.into_iter().enumerate() { - let (found_root, found_block_result) = - block_rx.recv().await.expect("should get block"); - - assert_eq!( - found_root, expected_root, - "expected block root should match" - ); - match found_block_result.as_ref() { - Ok(maybe_block) => { - let found_block = maybe_block.clone().expect("should have a block"); - let expected_block = expected_blocks - .get(start + i) - .expect("should get expected block"); - assert_eq!( - found_block.as_ref(), - expected_block, - "expected block should match found block" - ); - } - Err(e) => panic!("Error retrieving block {}: {:?}", expected_root, e), - } - } - } - } } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index fa9a0c2e697..322a2caa673 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2035,7 +2035,7 @@ impl BeaconChain { let _timer = metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); - VerifiedUnaggregatedAttestation::verify(unaggregated_attestation, subnet_id, self).map( + VerifiedUnaggregatedAttestation::verify(unaggregated_attestation, subnet_id, self).inspect( |v| { // This method is called for API and gossip attestations, so this covers all unaggregated attestation events if let Some(event_handler) = self.event_handler.as_ref() { @@ -2046,7 +2046,6 @@ impl BeaconChain { } } metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES); - v }, ) } @@ -2074,7 +2073,7 @@ impl BeaconChain { let _timer = metrics::start_timer(&metrics::AGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); - VerifiedAggregatedAttestation::verify(signed_aggregate, self).map(|v| { + VerifiedAggregatedAttestation::verify(signed_aggregate, self).inspect(|v| { // This method is called for API and gossip attestations, so this covers all aggregated attestation events if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_attestation_subscribers() { @@ -2084,7 +2083,6 @@ impl BeaconChain { } } metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_PROCESSING_SUCCESSES); - v }) } @@ -2098,9 +2096,8 @@ impl BeaconChain { metrics::inc_counter(&metrics::SYNC_MESSAGE_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::SYNC_MESSAGE_GOSSIP_VERIFICATION_TIMES); - VerifiedSyncCommitteeMessage::verify(sync_message, subnet_id, self).map(|v| { + VerifiedSyncCommitteeMessage::verify(sync_message, subnet_id, self).inspect(|_| { metrics::inc_counter(&metrics::SYNC_MESSAGE_PROCESSING_SUCCESSES); - v }) } @@ -2112,7 +2109,7 @@ impl BeaconChain { ) -> Result, SyncCommitteeError> { metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::SYNC_CONTRIBUTION_GOSSIP_VERIFICATION_TIMES); - VerifiedSyncContribution::verify(sync_contribution, self).map(|v| { + VerifiedSyncContribution::verify(sync_contribution, self).inspect(|v| { if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_contribution_subscribers() { event_handler.register(EventKind::ContributionAndProof(Box::new( @@ -2121,7 +2118,6 @@ impl BeaconChain { } } metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_PROCESSING_SUCCESSES); - v }) } @@ -2136,9 +2132,8 @@ impl BeaconChain { self, seen_timestamp, ) - .map(|v| { + .inspect(|_| { metrics::inc_counter(&metrics::FINALITY_UPDATE_PROCESSING_SUCCESSES); - v }) } @@ -2149,9 +2144,8 @@ impl BeaconChain { ) -> Result, GossipDataColumnError> { metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES); - GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| { + GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).inspect(|_| { metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_SUCCESSES); - v }) } @@ -2162,9 +2156,8 @@ impl BeaconChain { ) -> Result, GossipBlobError> { metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES); - GossipVerifiedBlob::new(blob_sidecar, subnet_id, self).map(|v| { + GossipVerifiedBlob::new(blob_sidecar, subnet_id, self).inspect(|_| { metrics::inc_counter(&metrics::BLOBS_SIDECAR_PROCESSING_SUCCESSES); - v }) } @@ -2179,9 +2172,8 @@ impl BeaconChain { self, seen_timestamp, ) - .map(|v| { + .inspect(|_| { metrics::inc_counter(&metrics::OPTIMISTIC_UPDATE_PROCESSING_SUCCESSES); - v }) } @@ -2485,7 +2477,7 @@ impl BeaconChain { .observed_voluntary_exits .lock() .verify_and_observe_at(exit, wall_clock_epoch, head_state, &self.spec) - .map(|exit| { + .inspect(|exit| { // this method is called for both API and gossip exits, so this covers all exit events if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_exit_subscribers() { @@ -2494,7 +2486,6 @@ impl BeaconChain { } } } - exit })?) } @@ -3073,6 +3064,23 @@ impl BeaconChain { return Err(BlockError::BlockIsAlreadyKnown(block_root)); } + // Reject RPC blobs referencing unknown parents. Otherwise we allow potentially invalid data + // into the da_checker, where invalid = descendant of invalid blocks. + // Note: blobs should have at least one item and all items have the same parent root. + if let Some(parent_root) = blobs + .iter() + .filter_map(|b| b.as_ref().map(|b| b.block_parent_root())) + .next() + { + if !self + .canonical_head + .fork_choice_read_lock() + .contains_block(&parent_root) + { + return Err(BlockError::ParentUnknown { parent_root }); + } + } + if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_blob_sidecar_subscribers() { for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) { @@ -3122,6 +3130,19 @@ impl BeaconChain { return Err(BlockError::BlockIsAlreadyKnown(block_root)); } + // Reject RPC columns referencing unknown parents. Otherwise we allow potentially invalid data + // into the da_checker, where invalid = descendant of invalid blocks. + // Note: custody_columns should have at least one item and all items have the same parent root. + if let Some(parent_root) = custody_columns.iter().map(|c| c.block_parent_root()).next() { + if !self + .canonical_head + .fork_choice_read_lock() + .contains_block(&parent_root) + { + return Err(BlockError::ParentUnknown { parent_root }); + } + } + let r = self .check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) .await; diff --git a/beacon_node/beacon_chain/src/bellatrix_readiness.rs b/beacon_node/beacon_chain/src/bellatrix_readiness.rs index c2e387c422f..500588953f3 100644 --- a/beacon_node/beacon_chain/src/bellatrix_readiness.rs +++ b/beacon_node/beacon_chain/src/bellatrix_readiness.rs @@ -4,7 +4,6 @@ use crate::{BeaconChain, BeaconChainError as Error, BeaconChainTypes}; use execution_layer::BlockByNumberQuery; use serde::{Deserialize, Serialize, Serializer}; -use slog::debug; use std::fmt; use std::fmt::Write; use types::*; @@ -199,7 +198,6 @@ impl BeaconChain { else { return Ok(GenesisExecutionPayloadStatus::Irrelevant); }; - let fork = self.spec.fork_name_at_epoch(Epoch::new(0)); let execution_layer = self .execution_layer @@ -222,49 +220,6 @@ impl BeaconChain { }); } - // Double-check the block by reconstructing it. - let execution_payload = execution_layer - .get_payload_by_hash_legacy(exec_block_hash, fork) - .await - .map_err(|e| Error::ExecutionLayerGetBlockByHashFailed(Box::new(e)))? - .ok_or(Error::BlockHashMissingFromExecutionLayer(exec_block_hash))?; - - // Verify payload integrity. - let header_from_payload = ExecutionPayloadHeader::from(execution_payload.to_ref()); - - let got_transactions_root = header_from_payload.transactions_root(); - let expected_transactions_root = latest_execution_payload_header.transactions_root(); - let got_withdrawals_root = header_from_payload.withdrawals_root().ok(); - let expected_withdrawals_root = latest_execution_payload_header.withdrawals_root().ok(); - - if got_transactions_root != expected_transactions_root { - return Ok(GenesisExecutionPayloadStatus::TransactionsRootMismatch { - got: got_transactions_root, - expected: expected_transactions_root, - }); - } - - if let Some(expected) = expected_withdrawals_root { - if let Some(got) = got_withdrawals_root { - if got != expected { - return Ok(GenesisExecutionPayloadStatus::WithdrawalsRootMismatch { - got, - expected, - }); - } - } - } - - if header_from_payload.to_ref() != latest_execution_payload_header { - debug!( - self.log, - "Genesis execution payload reconstruction failure"; - "consensus_node_header" => ?latest_execution_payload_header, - "execution_node_header" => ?header_from_payload - ); - return Ok(GenesisExecutionPayloadStatus::OtherMismatch); - } - Ok(GenesisExecutionPayloadStatus::Correct(exec_block_hash)) } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 8bd93a3753c..55547aaa18c 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -74,6 +74,7 @@ use derivative::Derivative; use eth2::types::{BlockGossip, EventKind, PublishBlockRequest}; use execution_layer::PayloadStatus; pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus}; +use lighthouse_metrics::TryExt; use parking_lot::RwLockReadGuard; use proto_array::Block as ProtoBlock; use safe_arith::ArithError; @@ -796,8 +797,12 @@ fn build_gossip_verified_data_columns( GossipDataColumnError::KzgNotInitialized, ))?; - let timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_COMPUTATION); - let sidecars = blobs_to_data_column_sidecars(&blobs, block, kzg, &chain.spec)?; + let mut timer = metrics::start_timer_vec( + &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, + &[&blobs.len().to_string()], + ); + let sidecars = blobs_to_data_column_sidecars(&blobs, block, kzg, &chain.spec) + .discard_timer_on_break(&mut timer)?; drop(timer); let mut gossip_verified_data_columns = vec![]; for sidecar in sidecars { @@ -829,12 +834,11 @@ pub trait IntoExecutionPendingBlock: Sized { notify_execution_layer: NotifyExecutionLayer, ) -> Result, BlockError> { self.into_execution_pending_block_slashable(block_root, chain, notify_execution_layer) - .map(|execution_pending| { + .inspect(|execution_pending| { // Supply valid block to slasher. if let Some(slasher) = chain.slasher.as_ref() { slasher.accept_block_header(execution_pending.block.signed_block_header()); } - execution_pending }) .map_err(|slash_info| process_block_slash_info::<_, BlockError>(chain, slash_info)) } diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index 55c1ee9e980..c2355e6f4f2 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -17,6 +17,10 @@ fn ssz_blob_to_crypto_blob(blob: &Blob) -> Result(blob: &Blob) -> Result, KzgError> { + ssz_blob_to_crypto_blob::(blob).map(Box::new) +} + /// Converts a cell ssz List object to an array to be used with the kzg /// crypto library. fn ssz_cell_to_crypto_cell(cell: &Cell) -> Result { @@ -34,7 +38,7 @@ pub fn validate_blob( kzg_proof: KzgProof, ) -> Result<(), KzgError> { let _timer = crate::metrics::start_timer(&crate::metrics::KZG_VERIFICATION_SINGLE_TIMES); - let kzg_blob = ssz_blob_to_crypto_blob::(blob)?; + let kzg_blob = ssz_blob_to_crypto_blob_boxed::(blob)?; kzg.verify_blob_kzg_proof(&kzg_blob, kzg_commitment, kzg_proof) } @@ -104,7 +108,7 @@ pub fn compute_blob_kzg_proof( blob: &Blob, kzg_commitment: KzgCommitment, ) -> Result { - let kzg_blob = ssz_blob_to_crypto_blob::(blob)?; + let kzg_blob = ssz_blob_to_crypto_blob_boxed::(blob)?; kzg.compute_blob_kzg_proof(&kzg_blob, kzg_commitment) } @@ -113,7 +117,7 @@ pub fn blob_to_kzg_commitment( kzg: &Kzg, blob: &Blob, ) -> Result { - let kzg_blob = ssz_blob_to_crypto_blob::(blob)?; + let kzg_blob = ssz_blob_to_crypto_blob_boxed::(blob)?; kzg.blob_to_kzg_commitment(&kzg_blob) } @@ -124,7 +128,7 @@ pub fn compute_kzg_proof( z: Hash256, ) -> Result<(KzgProof, Hash256), KzgError> { let z = z.0.into(); - let kzg_blob = ssz_blob_to_crypto_blob::(blob)?; + let kzg_blob = ssz_blob_to_crypto_blob_boxed::(blob)?; kzg.compute_kzg_proof(&kzg_blob, &z) .map(|(proof, z)| (proof, Hash256::from_slice(&z.to_vec()))) } diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 3da2bea36c8..79b2fc592b2 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1647,11 +1647,12 @@ pub static BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION: LazyLock> "Time taken to compute blob sidecar inclusion proof", ) }); -pub static DATA_COLUMN_SIDECAR_COMPUTATION: LazyLock> = LazyLock::new(|| { - try_create_histogram_with_buckets( +pub static DATA_COLUMN_SIDECAR_COMPUTATION: LazyLock> = LazyLock::new(|| { + try_create_histogram_vec_with_buckets( "data_column_sidecar_computation_seconds", "Time taken to compute data column sidecar, including cells, proofs and inclusion proof", - Ok(vec![0.04, 0.05, 0.1, 0.2, 0.3, 0.5, 0.7, 1.0]), + Ok(vec![0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]), + &["blob_count"], ) }); pub static DATA_COLUMN_SIDECAR_INCLUSION_PROOF_VERIFICATION: LazyLock> = diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index d4524900818..f8a483c6214 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -1185,7 +1185,7 @@ impl ValidatorMonitor { info!( self.log, - "Block from API"; + "Block from monitored validator"; "root" => ?block_root, "delay" => %delay.as_millis(), "slot" => %block.slot(), diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index d54543e4f6f..1261e2d53ea 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -62,13 +62,17 @@ async fn blob_sidecar_event_on_process_rpc_blobs() { let kzg = harness.chain.kzg.as_ref().unwrap(); let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); - let blob_1 = BlobSidecar::random_valid(&mut rng, kzg) - .map(Arc::new) - .unwrap(); - let blob_2 = Arc::new(BlobSidecar { + let mut blob_1 = BlobSidecar::random_valid(&mut rng, kzg).unwrap(); + let mut blob_2 = BlobSidecar { index: 1, ..BlobSidecar::random_valid(&mut rng, kzg).unwrap() - }); + }; + let parent_root = harness.chain.head().head_block_root(); + blob_1.signed_block_header.message.parent_root = parent_root; + blob_2.signed_block_header.message.parent_root = parent_root; + let blob_1 = Arc::new(blob_1); + let blob_2 = Arc::new(blob_2); + let blobs = FixedBlobSidecarList::from(vec![Some(blob_1.clone()), Some(blob_2.clone())]); let expected_sse_blobs = vec![ SseBlobSidecar::from_blob_sidecar(blob_1.as_ref()), diff --git a/beacon_node/eth1/src/inner.rs b/beacon_node/eth1/src/inner.rs index 452922b173c..7387642bf4c 100644 --- a/beacon_node/eth1/src/inner.rs +++ b/beacon_node/eth1/src/inner.rs @@ -75,10 +75,7 @@ impl Inner { SszEth1Cache::from_ssz_bytes(bytes) .map_err(|e| format!("Ssz decoding error: {:?}", e))? .to_inner(config, spec) - .map(|inner| { - inner.block_cache.write().rebuild_by_hash_map(); - inner - }) + .inspect(|inner| inner.block_cache.write().rebuild_by_hash_map()) } /// Returns a reference to the specification. diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index 8cfe6e9efde..8ba8ecfffbc 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -11,9 +11,6 @@ use eth2::types::{ BlobsBundle, SsePayloadAttributes, SsePayloadAttributesV1, SsePayloadAttributesV2, SsePayloadAttributesV3, }; -use ethers_core::types::Transaction; -use ethers_core::utils::rlp; -use ethers_core::utils::rlp::{Decodable, Rlp}; use http::deposit_methods::RpcError; pub use json_structures::{JsonWithdrawal, TransitionConfigurationV1}; use pretty_reqwest_error::PrettyReqwestError; @@ -43,8 +40,6 @@ pub use new_payload_request::{ NewPayloadRequestDeneb, NewPayloadRequestElectra, }; -use self::json_structures::{JsonConsolidationRequest, JsonDepositRequest, JsonWithdrawalRequest}; - pub const LATEST_TAG: &str = "latest"; pub type PayloadId = [u8; 8]; @@ -74,7 +69,6 @@ pub enum Error { RequiredMethodUnsupported(&'static str), UnsupportedForkVariant(String), InvalidClientVersion(String), - RlpDecoderError(rlp::DecoderError), TooManyConsolidationRequests(usize), } @@ -109,12 +103,6 @@ impl From for Error { } } -impl From for Error { - fn from(e: rlp::DecoderError) -> Self { - Error::RlpDecoderError(e) - } -} - impl From for Error { fn from(e: ssz_types::Error) -> Self { Error::SszError(e) @@ -161,186 +149,6 @@ pub struct ExecutionBlock { pub timestamp: u64, } -/// Representation of an execution block with enough detail to reconstruct a payload. -#[superstruct( - variants(Bellatrix, Capella, Deneb, Electra), - variant_attributes( - derive(Clone, Debug, PartialEq, Serialize, Deserialize,), - serde(bound = "E: EthSpec", rename_all = "camelCase"), - ), - cast_error(ty = "Error", expr = "Error::IncorrectStateVariant"), - partial_getter_error(ty = "Error", expr = "Error::IncorrectStateVariant") -)] -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -#[serde(bound = "E: EthSpec", rename_all = "camelCase", untagged)] -pub struct ExecutionBlockWithTransactions { - pub parent_hash: ExecutionBlockHash, - #[serde(alias = "miner")] - #[serde(with = "serde_utils::address_hex")] - pub fee_recipient: Address, - pub state_root: Hash256, - pub receipts_root: Hash256, - #[serde(with = "ssz_types::serde_utils::hex_fixed_vec")] - pub logs_bloom: FixedVector, - #[serde(alias = "mixHash")] - pub prev_randao: Hash256, - #[serde(rename = "number", with = "serde_utils::u64_hex_be")] - pub block_number: u64, - #[serde(with = "serde_utils::u64_hex_be")] - pub gas_limit: u64, - #[serde(with = "serde_utils::u64_hex_be")] - pub gas_used: u64, - #[serde(with = "serde_utils::u64_hex_be")] - pub timestamp: u64, - #[serde(with = "ssz_types::serde_utils::hex_var_list")] - pub extra_data: VariableList, - pub base_fee_per_gas: Uint256, - #[serde(rename = "hash")] - pub block_hash: ExecutionBlockHash, - pub transactions: Vec, - #[superstruct(only(Capella, Deneb, Electra))] - pub withdrawals: Vec, - #[superstruct(only(Deneb, Electra))] - #[serde(with = "serde_utils::u64_hex_be")] - pub blob_gas_used: u64, - #[superstruct(only(Deneb, Electra))] - #[serde(with = "serde_utils::u64_hex_be")] - pub excess_blob_gas: u64, - #[superstruct(only(Electra))] - pub deposit_requests: Vec, - #[superstruct(only(Electra))] - pub withdrawal_requests: Vec, - #[superstruct(only(Electra))] - pub consolidation_requests: Vec, -} - -impl TryFrom> for ExecutionBlockWithTransactions { - type Error = Error; - - fn try_from(payload: ExecutionPayload) -> Result { - let json_payload = match payload { - ExecutionPayload::Bellatrix(block) => { - Self::Bellatrix(ExecutionBlockWithTransactionsBellatrix { - parent_hash: block.parent_hash, - fee_recipient: block.fee_recipient, - state_root: block.state_root, - receipts_root: block.receipts_root, - logs_bloom: block.logs_bloom, - prev_randao: block.prev_randao, - block_number: block.block_number, - gas_limit: block.gas_limit, - gas_used: block.gas_used, - timestamp: block.timestamp, - extra_data: block.extra_data, - base_fee_per_gas: block.base_fee_per_gas, - block_hash: block.block_hash, - transactions: block - .transactions - .iter() - .map(|tx| Transaction::decode(&Rlp::new(tx))) - .collect::, _>>()?, - }) - } - ExecutionPayload::Capella(block) => { - Self::Capella(ExecutionBlockWithTransactionsCapella { - parent_hash: block.parent_hash, - fee_recipient: block.fee_recipient, - state_root: block.state_root, - receipts_root: block.receipts_root, - logs_bloom: block.logs_bloom, - prev_randao: block.prev_randao, - block_number: block.block_number, - gas_limit: block.gas_limit, - gas_used: block.gas_used, - timestamp: block.timestamp, - extra_data: block.extra_data, - base_fee_per_gas: block.base_fee_per_gas, - block_hash: block.block_hash, - transactions: block - .transactions - .iter() - .map(|tx| Transaction::decode(&Rlp::new(tx))) - .collect::, _>>()?, - withdrawals: Vec::from(block.withdrawals) - .into_iter() - .map(|withdrawal| withdrawal.into()) - .collect(), - }) - } - ExecutionPayload::Deneb(block) => Self::Deneb(ExecutionBlockWithTransactionsDeneb { - parent_hash: block.parent_hash, - fee_recipient: block.fee_recipient, - state_root: block.state_root, - receipts_root: block.receipts_root, - logs_bloom: block.logs_bloom, - prev_randao: block.prev_randao, - block_number: block.block_number, - gas_limit: block.gas_limit, - gas_used: block.gas_used, - timestamp: block.timestamp, - extra_data: block.extra_data, - base_fee_per_gas: block.base_fee_per_gas, - block_hash: block.block_hash, - transactions: block - .transactions - .iter() - .map(|tx| Transaction::decode(&Rlp::new(tx))) - .collect::, _>>()?, - withdrawals: Vec::from(block.withdrawals) - .into_iter() - .map(|withdrawal| withdrawal.into()) - .collect(), - blob_gas_used: block.blob_gas_used, - excess_blob_gas: block.excess_blob_gas, - }), - ExecutionPayload::Electra(block) => { - Self::Electra(ExecutionBlockWithTransactionsElectra { - parent_hash: block.parent_hash, - fee_recipient: block.fee_recipient, - state_root: block.state_root, - receipts_root: block.receipts_root, - logs_bloom: block.logs_bloom, - prev_randao: block.prev_randao, - block_number: block.block_number, - gas_limit: block.gas_limit, - gas_used: block.gas_used, - timestamp: block.timestamp, - extra_data: block.extra_data, - base_fee_per_gas: block.base_fee_per_gas, - block_hash: block.block_hash, - transactions: block - .transactions - .iter() - .map(|tx| Transaction::decode(&Rlp::new(tx))) - .collect::, _>>()?, - withdrawals: Vec::from(block.withdrawals) - .into_iter() - .map(|withdrawal| withdrawal.into()) - .collect(), - blob_gas_used: block.blob_gas_used, - excess_blob_gas: block.excess_blob_gas, - deposit_requests: block - .deposit_requests - .into_iter() - .map(|deposit| deposit.into()) - .collect(), - withdrawal_requests: block - .withdrawal_requests - .into_iter() - .map(|withdrawal| withdrawal.into()) - .collect(), - consolidation_requests: block - .consolidation_requests - .into_iter() - .map(Into::into) - .collect(), - }) - } - }; - Ok(json_payload) - } -} - #[superstruct( variants(V1, V2, V3), variant_attributes(derive(Clone, Debug, Eq, Hash, PartialEq),), diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 5bc1343a0eb..c497a4a7254 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -734,54 +734,6 @@ impl HttpJsonRpc { .await } - pub async fn get_block_by_hash_with_txns( - &self, - block_hash: ExecutionBlockHash, - fork: ForkName, - ) -> Result>, Error> { - let params = json!([block_hash, true]); - Ok(Some(match fork { - ForkName::Bellatrix => ExecutionBlockWithTransactions::Bellatrix( - self.rpc_request( - ETH_GET_BLOCK_BY_HASH, - params, - ETH_GET_BLOCK_BY_HASH_TIMEOUT * self.execution_timeout_multiplier, - ) - .await?, - ), - ForkName::Capella => ExecutionBlockWithTransactions::Capella( - self.rpc_request( - ETH_GET_BLOCK_BY_HASH, - params, - ETH_GET_BLOCK_BY_HASH_TIMEOUT * self.execution_timeout_multiplier, - ) - .await?, - ), - ForkName::Deneb => ExecutionBlockWithTransactions::Deneb( - self.rpc_request( - ETH_GET_BLOCK_BY_HASH, - params, - ETH_GET_BLOCK_BY_HASH_TIMEOUT * self.execution_timeout_multiplier, - ) - .await?, - ), - ForkName::Electra => ExecutionBlockWithTransactions::Electra( - self.rpc_request( - ETH_GET_BLOCK_BY_HASH, - params, - ETH_GET_BLOCK_BY_HASH_TIMEOUT * self.execution_timeout_multiplier, - ) - .await?, - ), - ForkName::Base | ForkName::Altair => { - return Err(Error::UnsupportedForkVariant(format!( - "called get_block_by_hash_with_txns with fork {:?}", - fork - ))) - } - })) - } - pub async fn new_payload_v1( &self, execution_payload: ExecutionPayload, diff --git a/beacon_node/execution_layer/src/engine_api/json_structures.rs b/beacon_node/execution_layer/src/engine_api/json_structures.rs index dbf889bbc8c..a05d584cfca 100644 --- a/beacon_node/execution_layer/src/engine_api/json_structures.rs +++ b/beacon_node/execution_layer/src/engine_api/json_structures.rs @@ -778,6 +778,57 @@ pub struct JsonExecutionPayloadBody { Option>, } +impl From> for JsonExecutionPayloadBodyV1 { + fn from(value: ExecutionPayloadBodyV1) -> Self { + Self { + transactions: value.transactions, + withdrawals: value.withdrawals.map(|json_withdrawals| { + VariableList::from( + json_withdrawals + .into_iter() + .map(Into::into) + .collect::>(), + ) + }), + } + } +} + +impl From> for JsonExecutionPayloadBodyV2 { + fn from(value: ExecutionPayloadBodyV2) -> Self { + Self { + transactions: value.transactions, + withdrawals: value.withdrawals.map(|json_withdrawals| { + VariableList::from( + json_withdrawals + .into_iter() + .map(Into::into) + .collect::>(), + ) + }), + deposit_requests: value.deposit_requests.map(|receipts| { + VariableList::from(receipts.into_iter().map(Into::into).collect::>()) + }), + withdrawal_requests: value.withdrawal_requests.map(|withdrawal_requests| { + VariableList::from( + withdrawal_requests + .into_iter() + .map(Into::into) + .collect::>(), + ) + }), + consolidation_requests: value.consolidation_requests.map(|consolidation_requests| { + VariableList::from( + consolidation_requests + .into_iter() + .map(Into::into) + .collect::>(), + ) + }), + } + } +} + impl From> for ExecutionPayloadBody { fn from(value: JsonExecutionPayloadBody) -> Self { match value { diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 6e3aca39594..648963a320e 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -144,6 +144,7 @@ pub enum Error { payload: ExecutionBlockHash, transactions_root: Hash256, }, + PayloadBodiesByRangeNotSupported, InvalidJWTSecret(String), InvalidForkForPayload, InvalidPayloadBody(String), @@ -1804,7 +1805,6 @@ impl ExecutionLayer { header: &ExecutionPayloadHeader, fork: ForkName, ) -> Result>, Error> { - let hash = header.block_hash(); let block_number = header.block_number(); // Handle default payload body. @@ -1823,7 +1823,9 @@ impl ExecutionLayer { // Use efficient payload bodies by range method if supported. let capabilities = self.get_engine_capabilities(None).await?; - if capabilities.get_payload_bodies_by_range_v1 { + if capabilities.get_payload_bodies_by_range_v1 + || capabilities.get_payload_bodies_by_range_v2 + { let mut payload_bodies = self.get_payload_bodies_by_range(block_number, 1).await?; if payload_bodies.len() != 1 { @@ -1838,8 +1840,7 @@ impl ExecutionLayer { }) .transpose() } else { - // Fall back to eth_blockByHash. - self.get_payload_by_hash_legacy(hash, fork).await + Err(Error::PayloadBodiesByRangeNotSupported) } } @@ -1854,196 +1855,6 @@ impl ExecutionLayer { .map_err(Error::EngineError) } - pub async fn get_payload_by_hash_legacy( - &self, - hash: ExecutionBlockHash, - fork: ForkName, - ) -> Result>, Error> { - self.engine() - .request(|engine| async move { - self.get_payload_by_hash_from_engine(engine, hash, fork) - .await - }) - .await - .map_err(Box::new) - .map_err(Error::EngineError) - } - - async fn get_payload_by_hash_from_engine( - &self, - engine: &Engine, - hash: ExecutionBlockHash, - fork: ForkName, - ) -> Result>, ApiError> { - let _timer = metrics::start_timer(&metrics::EXECUTION_LAYER_GET_PAYLOAD_BY_BLOCK_HASH); - - if hash == ExecutionBlockHash::zero() { - return match fork { - ForkName::Bellatrix => Ok(Some(ExecutionPayloadBellatrix::default().into())), - ForkName::Capella => Ok(Some(ExecutionPayloadCapella::default().into())), - ForkName::Deneb => Ok(Some(ExecutionPayloadDeneb::default().into())), - ForkName::Electra => Ok(Some(ExecutionPayloadElectra::default().into())), - ForkName::Base | ForkName::Altair => Err(ApiError::UnsupportedForkVariant( - format!("called get_payload_by_hash_from_engine with {}", fork), - )), - }; - } - - let Some(block) = engine - .api - .get_block_by_hash_with_txns::(hash, fork) - .await? - else { - return Ok(None); - }; - - let convert_transactions = |transactions: Vec| { - VariableList::new( - transactions - .into_iter() - .map(|tx| VariableList::new(tx.rlp().to_vec())) - .collect::, ssz_types::Error>>()?, - ) - .map_err(ApiError::SszError) - }; - - let payload = match block { - ExecutionBlockWithTransactions::Bellatrix(bellatrix_block) => { - ExecutionPayload::Bellatrix(ExecutionPayloadBellatrix { - parent_hash: bellatrix_block.parent_hash, - fee_recipient: bellatrix_block.fee_recipient, - state_root: bellatrix_block.state_root, - receipts_root: bellatrix_block.receipts_root, - logs_bloom: bellatrix_block.logs_bloom, - prev_randao: bellatrix_block.prev_randao, - block_number: bellatrix_block.block_number, - gas_limit: bellatrix_block.gas_limit, - gas_used: bellatrix_block.gas_used, - timestamp: bellatrix_block.timestamp, - extra_data: bellatrix_block.extra_data, - base_fee_per_gas: bellatrix_block.base_fee_per_gas, - block_hash: bellatrix_block.block_hash, - transactions: convert_transactions(bellatrix_block.transactions)?, - }) - } - ExecutionBlockWithTransactions::Capella(capella_block) => { - let withdrawals = VariableList::new( - capella_block - .withdrawals - .into_iter() - .map(Into::into) - .collect(), - ) - .map_err(ApiError::DeserializeWithdrawals)?; - ExecutionPayload::Capella(ExecutionPayloadCapella { - parent_hash: capella_block.parent_hash, - fee_recipient: capella_block.fee_recipient, - state_root: capella_block.state_root, - receipts_root: capella_block.receipts_root, - logs_bloom: capella_block.logs_bloom, - prev_randao: capella_block.prev_randao, - block_number: capella_block.block_number, - gas_limit: capella_block.gas_limit, - gas_used: capella_block.gas_used, - timestamp: capella_block.timestamp, - extra_data: capella_block.extra_data, - base_fee_per_gas: capella_block.base_fee_per_gas, - block_hash: capella_block.block_hash, - transactions: convert_transactions(capella_block.transactions)?, - withdrawals, - }) - } - ExecutionBlockWithTransactions::Deneb(deneb_block) => { - let withdrawals = VariableList::new( - deneb_block - .withdrawals - .into_iter() - .map(Into::into) - .collect(), - ) - .map_err(ApiError::DeserializeWithdrawals)?; - ExecutionPayload::Deneb(ExecutionPayloadDeneb { - parent_hash: deneb_block.parent_hash, - fee_recipient: deneb_block.fee_recipient, - state_root: deneb_block.state_root, - receipts_root: deneb_block.receipts_root, - logs_bloom: deneb_block.logs_bloom, - prev_randao: deneb_block.prev_randao, - block_number: deneb_block.block_number, - gas_limit: deneb_block.gas_limit, - gas_used: deneb_block.gas_used, - timestamp: deneb_block.timestamp, - extra_data: deneb_block.extra_data, - base_fee_per_gas: deneb_block.base_fee_per_gas, - block_hash: deneb_block.block_hash, - transactions: convert_transactions(deneb_block.transactions)?, - withdrawals, - blob_gas_used: deneb_block.blob_gas_used, - excess_blob_gas: deneb_block.excess_blob_gas, - }) - } - ExecutionBlockWithTransactions::Electra(electra_block) => { - let withdrawals = VariableList::new( - electra_block - .withdrawals - .into_iter() - .map(Into::into) - .collect(), - ) - .map_err(ApiError::DeserializeWithdrawals)?; - let deposit_requests = VariableList::new( - electra_block - .deposit_requests - .into_iter() - .map(Into::into) - .collect(), - ) - .map_err(ApiError::DeserializeDepositRequests)?; - let withdrawal_requests = VariableList::new( - electra_block - .withdrawal_requests - .into_iter() - .map(Into::into) - .collect(), - ) - .map_err(ApiError::DeserializeWithdrawalRequests)?; - let n_consolidations = electra_block.consolidation_requests.len(); - let consolidation_requests = VariableList::new( - electra_block - .consolidation_requests - .into_iter() - .map(Into::into) - .collect::>(), - ) - .map_err(|_| ApiError::TooManyConsolidationRequests(n_consolidations))?; - ExecutionPayload::Electra(ExecutionPayloadElectra { - parent_hash: electra_block.parent_hash, - fee_recipient: electra_block.fee_recipient, - state_root: electra_block.state_root, - receipts_root: electra_block.receipts_root, - logs_bloom: electra_block.logs_bloom, - prev_randao: electra_block.prev_randao, - block_number: electra_block.block_number, - gas_limit: electra_block.gas_limit, - gas_used: electra_block.gas_used, - timestamp: electra_block.timestamp, - extra_data: electra_block.extra_data, - base_fee_per_gas: electra_block.base_fee_per_gas, - block_hash: electra_block.block_hash, - transactions: convert_transactions(electra_block.transactions)?, - withdrawals, - blob_gas_used: electra_block.blob_gas_used, - excess_blob_gas: electra_block.excess_blob_gas, - deposit_requests, - withdrawal_requests, - consolidation_requests, - }) - } - }; - - Ok(Some(payload)) - } - pub async fn propose_blinded_beacon_block( &self, block_root: Hash256, diff --git a/beacon_node/execution_layer/src/metrics.rs b/beacon_node/execution_layer/src/metrics.rs index c3da449535c..184031af4d0 100644 --- a/beacon_node/execution_layer/src/metrics.rs +++ b/beacon_node/execution_layer/src/metrics.rs @@ -54,13 +54,6 @@ pub static EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID: LazyLock> = - LazyLock::new(|| { - try_create_histogram( - "execution_layer_get_payload_by_block_hash_time", - "Time to reconstruct a payload from the EE using eth_getBlockByHash", - ) - }); pub static EXECUTION_LAYER_GET_PAYLOAD_BODIES_BY_RANGE: LazyLock> = LazyLock::new(|| { try_create_histogram( diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index 041b31e2b08..6094e0d6960 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -1,14 +1,11 @@ -use crate::engines::ForkchoiceState; -use crate::EthersTransaction; -use crate::{ - engine_api::{ - json_structures::{ - JsonForkchoiceUpdatedV1Response, JsonPayloadStatusV1, JsonPayloadStatusV1Status, - }, - ExecutionBlock, PayloadAttributes, PayloadId, PayloadStatusV1, PayloadStatusV1Status, +use crate::engine_api::{ + json_structures::{ + JsonForkchoiceUpdatedV1Response, JsonPayloadStatusV1, JsonPayloadStatusV1Status, }, - ExecutionBlockWithTransactions, + ExecutionBlock, PayloadAttributes, PayloadId, PayloadStatusV1, PayloadStatusV1Status, }; +use crate::engines::ForkchoiceState; +use crate::EthersTransaction; use eth2::types::BlobsBundle; use kzg::{Kzg, KzgCommitment, KzgProof}; use parking_lot::Mutex; @@ -89,17 +86,13 @@ impl Block { } } - pub fn as_execution_block_with_tx(&self) -> Option> { + pub fn as_execution_payload(&self) -> Option> { match self { - Block::PoS(payload) => Some(payload.clone().try_into().unwrap()), - Block::PoW(block) => Some( - ExecutionPayload::Bellatrix(ExecutionPayloadBellatrix { - block_hash: block.block_hash, - ..Default::default() - }) - .try_into() - .unwrap(), - ), + Block::PoS(payload) => Some(payload.clone()), + Block::PoW(block) => Some(ExecutionPayload::Bellatrix(ExecutionPayloadBellatrix { + block_hash: block.block_hash, + ..Default::default() + })), } } } @@ -255,20 +248,17 @@ impl ExecutionBlockGenerator { .map(|block| block.as_execution_block(self.terminal_total_difficulty)) } - pub fn execution_block_with_txs_by_hash( + pub fn execution_payload_by_hash( &self, hash: ExecutionBlockHash, - ) -> Option> { + ) -> Option> { self.block_by_hash(hash) - .and_then(|block| block.as_execution_block_with_tx()) + .and_then(|block| block.as_execution_payload()) } - pub fn execution_block_with_txs_by_number( - &self, - number: u64, - ) -> Option> { + pub fn execution_payload_by_number(&self, number: u64) -> Option> { self.block_by_number(number) - .and_then(|block| block.as_execution_block_with_tx()) + .and_then(|block| block.as_execution_payload()) } pub fn move_to_block_prior_to_terminal_block(&mut self) -> Result<(), String> { @@ -954,6 +944,7 @@ mod test { let kzg = load_kzg()?; let (kzg_commitment, kzg_proof, blob) = load_test_blobs_bundle::()?; let kzg_blob = kzg::Blob::from_bytes(blob.as_ref()) + .map(Box::new) .map_err(|e| format!("Error converting blob to kzg blob: {e:?}"))?; kzg.verify_blob_kzg_proof(&kzg_blob, kzg_commitment, kzg_proof) .map_err(|e| format!("Invalid blobs bundle: {e:?}")) diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index 81c69caf829..f36cb9797d3 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -83,12 +83,10 @@ pub async fn handle_rpc( .ok_or_else(|| "missing/invalid params[1] value".to_string()) .map_err(|s| (s, BAD_PARAMS_ERROR_CODE))?; if full_tx { - Ok(serde_json::to_value( - ctx.execution_block_generator - .read() - .execution_block_with_txs_by_hash(hash), - ) - .unwrap()) + Err(( + "full_tx support has been removed".to_string(), + BAD_PARAMS_ERROR_CODE, + )) } else { Ok(serde_json::to_value( ctx.execution_block_generator @@ -556,40 +554,25 @@ pub async fn handle_rpc( let mut response = vec![]; for block_num in start..(start + count) { - let maybe_block = ctx + let maybe_payload = ctx .execution_block_generator .read() - .execution_block_with_txs_by_number(block_num); - - match maybe_block { - Some(block) => { - let transactions = Transactions::::new( - block - .transactions() - .iter() - .map(|transaction| VariableList::new(transaction.rlp().to_vec())) - .collect::>() - .map_err(|e| { - ( - format!("failed to deserialize transaction: {:?}", e), - GENERIC_ERROR_CODE, - ) - })?, - ) - .map_err(|e| { - ( - format!("failed to deserialize transactions: {:?}", e), - GENERIC_ERROR_CODE, - ) - })?; - - response.push(Some(JsonExecutionPayloadBodyV1:: { - transactions, - withdrawals: block - .withdrawals() - .ok() - .map(|withdrawals| VariableList::from(withdrawals.clone())), - })); + .execution_payload_by_number(block_num); + + match maybe_payload { + Some(payload) => { + assert!( + !payload.fork_name().electra_enabled(), + "payload bodies V1 is not supported for Electra blocks" + ); + let payload_body = ExecutionPayloadBodyV1 { + transactions: payload.transactions().clone(), + withdrawals: payload.withdrawals().ok().cloned(), + }; + let json_payload_body = JsonExecutionPayloadBody::V1( + JsonExecutionPayloadBodyV1::::from(payload_body), + ); + response.push(Some(json_payload_body)); } None => response.push(None), } @@ -611,63 +594,28 @@ pub async fn handle_rpc( let mut response = vec![]; for block_num in start..(start + count) { - let maybe_block = ctx + let maybe_payload = ctx .execution_block_generator .read() - .execution_block_with_txs_by_number(block_num); - - match maybe_block { - Some(block) => { - let transactions = Transactions::::new( - block - .transactions() - .iter() - .map(|transaction| VariableList::new(transaction.rlp().to_vec())) - .collect::>() - .map_err(|e| { - ( - format!("failed to deserialize transaction: {:?}", e), - GENERIC_ERROR_CODE, - ) - })?, - ) - .map_err(|e| { - ( - format!("failed to deserialize transactions: {:?}", e), - GENERIC_ERROR_CODE, - ) - })?; + .execution_payload_by_number(block_num); + match maybe_payload { + Some(payload) => { // TODO(electra): add testing for: // deposit_requests // withdrawal_requests // consolidation_requests - response.push(Some(JsonExecutionPayloadBodyV2:: { - transactions, - withdrawals: block - .withdrawals() - .ok() - .map(|withdrawals| VariableList::from(withdrawals.clone())), - deposit_requests: block.deposit_requests().ok().map( - |deposit_requests| VariableList::from(deposit_requests.clone()), - ), - withdrawal_requests: block.withdrawal_requests().ok().map( - |withdrawal_requests| { - VariableList::from(withdrawal_requests.clone()) - }, - ), - consolidation_requests: block.consolidation_requests().ok().map( - |consolidation_requests| { - VariableList::from( - consolidation_requests - .clone() - .into_iter() - .map(Into::into) - .collect::>(), - ) - }, - ), - })); + let payload_body = ExecutionPayloadBodyV2 { + transactions: payload.transactions().clone(), + withdrawals: payload.withdrawals().ok().cloned(), + deposit_requests: payload.deposit_requests().ok().cloned(), + withdrawal_requests: payload.withdrawal_requests().ok().cloned(), + consolidation_requests: payload.consolidation_requests().ok().cloned(), + }; + let json_payload_body = JsonExecutionPayloadBody::V2( + JsonExecutionPayloadBodyV2::::from(payload_body), + ); + response.push(Some(json_payload_body)); } None => response.push(None), } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index e0fc518d46c..ad7cb3081ea 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -273,7 +273,7 @@ pub async fn publish_block match protocol { Either::Left(protocol) => handler.on_fully_negotiated_inbound(protocol), + #[allow(unreachable_patterns)] Either::Right(v) => void::unreachable(v), }, ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { @@ -531,6 +532,9 @@ impl ConnectionHandler for Handler { }) => { tracing::debug!("Dial upgrade error: Protocol negotiation timeout"); } + // This pattern is unreachable as of Rust 1.82, we can remove it once the + // MSRV is increased past that version. + #[allow(unreachable_patterns)] ConnectionEvent::DialUpgradeError(DialUpgradeError { error: StreamUpgradeError::Apply(e), .. diff --git a/beacon_node/lighthouse_network/gossipsub/src/protocol.rs b/beacon_node/lighthouse_network/gossipsub/src/protocol.rs index 5611ae32c91..b72f4ccc9b5 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/protocol.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/protocol.rs @@ -40,9 +40,9 @@ use void::Void; pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; -pub(crate) const GOSSIPSUB_1_2_0_BETA_PROTOCOL: ProtocolId = ProtocolId { +pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId { protocol: StreamProtocol::new("/meshsub/1.2.0"), - kind: PeerKind::Gossipsubv1_2_beta, + kind: PeerKind::Gossipsubv1_2, }; pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId { protocol: StreamProtocol::new("/meshsub/1.1.0"), @@ -74,7 +74,7 @@ impl Default for ProtocolConfig { max_transmit_size: 65536, validation_mode: ValidationMode::Strict, protocol_ids: vec![ - GOSSIPSUB_1_2_0_BETA_PROTOCOL, + GOSSIPSUB_1_2_0_PROTOCOL, GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL, ], diff --git a/beacon_node/lighthouse_network/gossipsub/src/types.rs b/beacon_node/lighthouse_network/gossipsub/src/types.rs index 8df307d470b..d14a9293749 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/types.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/types.rs @@ -132,7 +132,7 @@ pub(crate) struct PeerConnections { #[allow(non_camel_case_types)] pub enum PeerKind { /// A gossipsub 1.2 peer. - Gossipsubv1_2_beta, + Gossipsubv1_2, /// A gossipsub 1.1 peer. Gossipsubv1_1, /// A gossipsub 1.0 peer. @@ -148,7 +148,7 @@ impl PeerKind { pub(crate) fn is_gossipsub(&self) -> bool { matches!( self, - Self::Gossipsubv1_2_beta | Self::Gossipsubv1_1 | Self::Gossipsub + Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub ) } } @@ -623,7 +623,7 @@ impl PeerKind { Self::Floodsub => "Floodsub", Self::Gossipsub => "Gossipsub v1.0", Self::Gossipsubv1_1 => "Gossipsub v1.1", - Self::Gossipsubv1_2_beta => "Gossipsub v1.2-beta", + Self::Gossipsubv1_2 => "Gossipsub v1.2", } } } diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index 7415fdaf590..73552e0197f 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -38,7 +38,7 @@ pub trait Eth2Enr { ) -> Result, &'static str>; /// The peerdas custody subnet count associated with the ENR. - fn custody_subnet_count(&self, spec: &ChainSpec) -> u64; + fn custody_subnet_count(&self, spec: &ChainSpec) -> Result; fn eth2(&self) -> Result; } @@ -64,14 +64,17 @@ impl Eth2Enr for Enr { .map_err(|_| "Could not decode the ENR syncnets bitfield") } - /// if the custody value is non-existent in the ENR, then we assume the minimum custody value - /// defined in the spec. - fn custody_subnet_count(&self, spec: &ChainSpec) -> u64 { - self.get_decodable::(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) - .and_then(|r| r.ok()) - // If value supplied in ENR is invalid, fallback to `custody_requirement` - .filter(|csc| csc <= &spec.data_column_sidecar_subnet_count) - .unwrap_or(spec.custody_requirement) + fn custody_subnet_count(&self, spec: &ChainSpec) -> Result { + let csc = self + .get_decodable::(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) + .ok_or("ENR custody subnet count non-existent")? + .map_err(|_| "Could not decode the ENR custody subnet count")?; + + if csc >= spec.custody_requirement && csc <= spec.data_column_sidecar_subnet_count { + Ok(csc) + } else { + Err("Invalid custody subnet count in ENR") + } } fn eth2(&self) -> Result { @@ -335,7 +338,7 @@ mod test { let enr = build_enr_with_config(config, &spec).0; assert_eq!( - enr.custody_subnet_count::(&spec), + enr.custody_subnet_count::(&spec).unwrap(), spec.custody_requirement, ); } @@ -350,31 +353,11 @@ mod test { let enr = build_enr_with_config(config, &spec).0; assert_eq!( - enr.custody_subnet_count::(&spec), + enr.custody_subnet_count::(&spec).unwrap(), spec.data_column_sidecar_subnet_count, ); } - #[test] - fn custody_subnet_count_fallback_default() { - let config = NetworkConfig::default(); - let spec = make_eip7594_spec(); - let (mut enr, enr_key) = build_enr_with_config(config, &spec); - let invalid_subnet_count = 999u64; - - enr.insert( - PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, - &invalid_subnet_count, - &enr_key, - ) - .unwrap(); - - assert_eq!( - enr.custody_subnet_count::(&spec), - spec.custody_requirement, - ); - } - fn build_enr_with_config(config: NetworkConfig, spec: &ChainSpec) -> (Enr, CombinedKey) { let keypair = libp2p::identity::secp256k1::Keypair::generate(); let enr_key = CombinedKey::from_secp256k1(&keypair); diff --git a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs index e198b3ee17f..02ff0cc3ca4 100644 --- a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs +++ b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs @@ -16,7 +16,6 @@ where E: EthSpec, { let log_clone = log.clone(); - let spec_clone = spec.clone(); move |enr: &Enr| { let attestation_bitfield: EnrAttestationBitfield = match enr.attestation_bitfield::() @@ -30,8 +29,6 @@ where let sync_committee_bitfield: Result, _> = enr.sync_committee_bitfield::(); - let custody_subnet_count = enr.custody_subnet_count::(&spec_clone); - let predicate = subnets.iter().any(|subnet| match subnet { Subnet::Attestation(s) => attestation_bitfield .get(*s.deref() as usize) @@ -40,12 +37,16 @@ where .as_ref() .map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)), Subnet::DataColumn(s) => { - let mut subnets = DataColumnSubnetId::compute_custody_subnets::( - enr.node_id().raw(), - custody_subnet_count, - &spec, - ); - subnets.contains(s) + if let Ok(custody_subnet_count) = enr.custody_subnet_count::(&spec) { + DataColumnSubnetId::compute_custody_subnets::( + enr.node_id().raw(), + custody_subnet_count, + &spec, + ) + .map_or(false, |mut subnets| subnets.contains(s)) + } else { + false + } } }); diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index 85da8dc2112..c3f64a5a1f4 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -91,6 +91,15 @@ pub static PEERS_PER_CLIENT: LazyLock> = LazyLock::new(|| { &["Client"], ) }); + +pub static PEERS_PER_CUSTODY_SUBNET_COUNT: LazyLock> = LazyLock::new(|| { + try_create_int_gauge_vec( + "peers_per_custody_subnet_count", + "The current count of peers by custody subnet count", + &["custody_subnet_count"], + ) +}); + pub static FAILED_ATTESTATION_PUBLISHES_PER_SUBNET: LazyLock> = LazyLock::new(|| { try_create_int_gauge_vec( diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 31ff8bdfc23..4d913312354 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -19,7 +19,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use types::{EthSpec, SyncSubnetId}; +use types::{DataColumnSubnetId, EthSpec, SyncSubnetId}; pub use libp2p::core::Multiaddr; pub use libp2p::identity::Keypair; @@ -33,7 +33,7 @@ pub use peerdb::peer_info::{ }; use peerdb::score::{PeerAction, ReportSource}; pub use peerdb::sync_status::{SyncInfo, SyncStatus}; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::net::IpAddr; use strum::IntoEnumIterator; @@ -701,6 +701,8 @@ impl PeerManager { /// Received a metadata response from a peer. pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { + let mut invalid_meta_data = false; + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { if let Some(known_meta_data) = &peer_info.meta_data() { if *known_meta_data.seq_number() < *meta_data.seq_number() { @@ -717,12 +719,39 @@ impl PeerManager { debug!(self.log, "Obtained peer's metadata"; "peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number()); } - let node_id_opt = peer_id_to_node_id(peer_id).ok(); - peer_info.set_meta_data(meta_data, node_id_opt, &self.network_globals.spec); + + let custody_subnet_count_opt = meta_data.custody_subnet_count().copied().ok(); + peer_info.set_meta_data(meta_data); + + if self.network_globals.spec.is_peer_das_scheduled() { + // Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to + // prioritize PeerDAS peers. + if let Some(custody_subnet_count) = custody_subnet_count_opt { + match self.compute_peer_custody_subnets(peer_id, custody_subnet_count) { + Ok(custody_subnets) => { + peer_info.set_custody_subnets(custody_subnets); + } + Err(err) => { + debug!(self.log, "Unable to compute peer custody subnets from metadata"; + "info" => "Sending goodbye to peer", + "peer_id" => %peer_id, + "custody_subnet_count" => custody_subnet_count, + "error" => ?err, + ); + invalid_meta_data = true; + } + }; + } + } } else { error!(self.log, "Received METADATA from an unknown peer"; "peer_id" => %peer_id); } + + // Disconnect peers with invalid metadata and find other peers instead. + if invalid_meta_data { + self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager) + } } /// Updates the gossipsub scores for all known peers in gossipsub. @@ -1290,6 +1319,7 @@ impl PeerManager { let mut peers_connected = 0; let mut clients_per_peer = HashMap::new(); let mut peers_connected_mutli: HashMap<(&str, &str), i32> = HashMap::new(); + let mut peers_per_custody_subnet_count: HashMap = HashMap::new(); for (_, peer_info) in self.network_globals.peers.read().connected_peers() { peers_connected += 1; @@ -1320,11 +1350,26 @@ impl PeerManager { *peers_connected_mutli .entry((direction, transport)) .or_default() += 1; + + if let Some(MetaData::V3(meta_data)) = peer_info.meta_data() { + *peers_per_custody_subnet_count + .entry(meta_data.custody_subnet_count) + .or_default() += 1; + } } // PEERS_CONNECTED metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected); + // CUSTODY_SUBNET_COUNT + for (custody_subnet_count, peer_count) in peers_per_custody_subnet_count.into_iter() { + metrics::set_gauge_vec( + &metrics::PEERS_PER_CUSTODY_SUBNET_COUNT, + &[&custody_subnet_count.to_string()], + peer_count, + ) + } + // PEERS_PER_CLIENT for client_kind in ClientKind::iter() { let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0); @@ -1348,6 +1393,45 @@ impl PeerManager { } } } + + fn compute_peer_custody_subnets( + &self, + peer_id: &PeerId, + custody_subnet_count: u64, + ) -> Result, String> { + // If we don't have a node id, we cannot compute the custody duties anyway + let node_id = peer_id_to_node_id(peer_id)?; + let spec = &self.network_globals.spec; + + if !(spec.custody_requirement..=spec.data_column_sidecar_subnet_count) + .contains(&custody_subnet_count) + { + return Err("Invalid custody subnet count in metadata: out of range".to_string()); + } + + let custody_subnets = DataColumnSubnetId::compute_custody_subnets::( + node_id.raw(), + custody_subnet_count, + spec, + ) + .map(|subnets| subnets.collect()) + .unwrap_or_else(|e| { + // This is an unreachable scenario unless there's a bug, as we've validated the csc + // just above. + error!( + self.log, + "Computing peer custody subnets failed unexpectedly"; + "info" => "Falling back to default custody requirement subnets", + "peer_id" => %peer_id, + "custody_subnet_count" => custody_subnet_count, + "error" => ?e + ); + DataColumnSubnetId::compute_custody_requirement_subnets::(node_id.raw(), spec) + .collect() + }); + + Ok(custody_subnets) + } } enum ConnectingType { @@ -1680,11 +1764,7 @@ mod tests { .write() .peer_info_mut(&peer0) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); peer_manager .network_globals .peers @@ -1704,11 +1784,7 @@ mod tests { .write() .peer_info_mut(&peer2) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); peer_manager .network_globals .peers @@ -1728,11 +1804,7 @@ mod tests { .write() .peer_info_mut(&peer4) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); peer_manager .network_globals .peers @@ -1806,11 +1878,7 @@ mod tests { .write() .peer_info_mut(&peer) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); peer_manager .network_globals .peers @@ -1934,11 +2002,7 @@ mod tests { .write() .peer_info_mut(&peer) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); let long_lived_subnets = peer_manager .network_globals .peers @@ -2047,11 +2111,7 @@ mod tests { .write() .peer_info_mut(&peer) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); let long_lived_subnets = peer_manager .network_globals .peers @@ -2217,11 +2277,7 @@ mod tests { .write() .peer_info_mut(&peer) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); let long_lived_subnets = peer_manager .network_globals .peers @@ -2378,11 +2434,7 @@ mod tests { let mut peer_db = peer_manager.network_globals.peers.write(); let peer_info = peer_db.peer_info_mut(&condition.peer_id).unwrap(); - peer_info.set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + peer_info.set_meta_data(MetaData::V2(metadata)); peer_info.set_gossipsub_score(condition.gossipsub_score); peer_info.add_to_score(condition.score); diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index d9df8e7c4bb..b7fd5b5e5d7 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -240,10 +240,6 @@ impl PeerManager { "connection" => ?endpoint.to_endpoint() ); - if other_established == 0 { - self.events.push(PeerManagerEvent::MetaData(peer_id)); - } - // Update the prometheus metrics if self.metrics_enabled { metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); @@ -267,6 +263,10 @@ impl PeerManager { return; } + if other_established == 0 { + self.events.push(PeerManagerEvent::MetaData(peer_id)); + } + // NOTE: We don't register peers that we are disconnecting immediately. The network service // does not need to know about these peers. match endpoint { diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 6e76fd4bb00..f6b63e6de22 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,8 +1,6 @@ use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::discovery::CombinedKey; -use crate::{ - metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Eth2Enr, Gossipsub, PeerId, -}; +use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId}; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; use score::{PeerAction, ReportSource, Score, ScoreState}; @@ -47,16 +45,10 @@ pub struct PeerDB { disable_peer_scoring: bool, /// PeerDB's logger log: slog::Logger, - spec: ChainSpec, } impl PeerDB { - pub fn new( - trusted_peers: Vec, - disable_peer_scoring: bool, - log: &slog::Logger, - spec: ChainSpec, - ) -> Self { + pub fn new(trusted_peers: Vec, disable_peer_scoring: bool, log: &slog::Logger) -> Self { // Initialize the peers hashmap with trusted peers let peers = trusted_peers .into_iter() @@ -68,7 +60,6 @@ impl PeerDB { banned_peers_count: BannedPeersCount::default(), disable_peer_scoring, peers, - spec, } } @@ -726,6 +717,14 @@ impl PeerDB { }, ); + if supernode { + let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); + let all_subnets = (0..spec.data_column_sidecar_subnet_count) + .map(|csc| csc.into()) + .collect(); + peer_info.set_custody_subnets(all_subnets); + } + peer_id } @@ -791,14 +790,6 @@ impl PeerDB { ) => { // Update the ENR if one exists, and compute the custody subnets if let Some(enr) = enr { - let custody_subnet_count = enr.custody_subnet_count::(&self.spec); - let custody_subnets = DataColumnSubnetId::compute_custody_subnets::( - enr.node_id().raw(), - custody_subnet_count, - &self.spec, - ) - .collect::>(); - info.set_custody_subnets(custody_subnets); info.set_enr(enr); } @@ -1349,8 +1340,7 @@ mod tests { fn get_db() -> PeerDB { let log = build_log(slog::Level::Debug, false); - let spec = M::default_spec(); - PeerDB::new(vec![], false, &log, spec) + PeerDB::new(vec![], false, &log) } #[test] @@ -2049,8 +2039,7 @@ mod tests { fn test_trusted_peers_score() { let trusted_peer = PeerId::random(); let log = build_log(slog::Level::Debug, false); - let spec = M::default_spec(); - let mut pdb: PeerDB = PeerDB::new(vec![trusted_peer], false, &log, spec); + let mut pdb: PeerDB = PeerDB::new(vec![trusted_peer], false, &log); pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None); @@ -2074,8 +2063,7 @@ mod tests { fn test_disable_peer_scoring() { let peer = PeerId::random(); let log = build_log(slog::Level::Debug, false); - let spec = M::default_spec(); - let mut pdb: PeerDB = PeerDB::new(vec![], true, &log, spec); + let mut pdb: PeerDB = PeerDB::new(vec![], true, &log); pdb.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 1ea3f8ed5fc..ee8c27f474c 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -3,7 +3,6 @@ use super::score::{PeerAction, Score, ScoreState}; use super::sync_status::SyncStatus; use crate::discovery::Eth2Enr; use crate::{rpc::MetaData, types::Subnet}; -use discv5::enr::NodeId; use discv5::Enr; use libp2p::core::multiaddr::{Multiaddr, Protocol}; use serde::{ @@ -14,7 +13,7 @@ use std::collections::HashSet; use std::net::IpAddr; use std::time::Instant; use strum::AsRefStr; -use types::{ChainSpec, DataColumnSubnetId, EthSpec}; +use types::{DataColumnSubnetId, EthSpec}; use PeerConnectionStatus::*; /// Information about a given connected peer. @@ -358,31 +357,7 @@ impl PeerInfo { /// Sets an explicit value for the meta data. // VISIBILITY: The peer manager is able to adjust the meta_data - pub(in crate::peer_manager) fn set_meta_data( - &mut self, - meta_data: MetaData, - node_id_opt: Option, - spec: &ChainSpec, - ) { - // If we don't have a node id, we cannot compute the custody duties anyway - let Some(node_id) = node_id_opt else { - self.meta_data = Some(meta_data); - return; - }; - - // Already set by enr if custody_subnets is non empty - if self.custody_subnets.is_empty() { - if let Ok(custody_subnet_count) = meta_data.custody_subnet_count() { - let custody_subnets = DataColumnSubnetId::compute_custody_subnets::( - node_id.raw(), - std::cmp::min(*custody_subnet_count, spec.data_column_sidecar_subnet_count), - spec, - ) - .collect::>(); - self.set_custody_subnets(custody_subnets); - } - } - + pub(in crate::peer_manager) fn set_meta_data(&mut self, meta_data: MetaData) { self.meta_data = Some(meta_data); } @@ -391,7 +366,10 @@ impl PeerInfo { self.connection_status = connection_status } - pub(super) fn set_custody_subnets(&mut self, custody_subnets: HashSet) { + pub(in crate::peer_manager) fn set_custody_subnets( + &mut self, + custody_subnets: HashSet, + ) { self.custody_subnets = custody_subnets } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs similarity index 91% rename from beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs rename to beacon_node/lighthouse_network/src/rpc/codec.rs index 8f5143d7ed9..224fb8a5f71 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -1,9 +1,9 @@ use crate::rpc::methods::*; -use crate::rpc::{ - codec::base::OutboundCodec, - protocol::{Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN}, +use crate::rpc::protocol::{ + Encoding, ProtocolId, RPCError, SupportedProtocol, ERROR_TYPE_MAX, ERROR_TYPE_MIN, }; use crate::rpc::{InboundRequest, OutboundRequest}; +use libp2p::bytes::BufMut; use libp2p::bytes::BytesMut; use snap::read::FrameDecoder; use snap::write::FrameEncoder; @@ -57,13 +57,13 @@ impl SSZSnappyInboundCodec { max_packet_size, } } -} -// Encoder for inbound streams: Encodes RPC Responses sent to peers. -impl Encoder> for SSZSnappyInboundCodec { - type Error = RPCError; - - fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + /// Encodes RPC Responses sent to peers. + fn encode_response( + &mut self, + item: RPCCodedResponse, + dst: &mut BytesMut, + ) -> Result<(), RPCError> { let bytes = match &item { RPCCodedResponse::Success(resp) => match &resp { RPCResponse::Status(res) => res.as_ssz_bytes(), @@ -125,6 +125,21 @@ impl Encoder> for SSZSnappyInboundCodec { } } +// Encoder for inbound streams: Encodes RPC Responses sent to peers. +impl Encoder> for SSZSnappyInboundCodec { + type Error = RPCError; + + fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.clear(); + dst.reserve(1); + dst.put_u8( + item.as_u8() + .expect("Should never encode a stream termination"), + ); + self.encode_response(item, dst) + } +} + // Decoder for inbound streams: Decodes RPC requests from peers impl Decoder for SSZSnappyInboundCodec { type Item = InboundRequest; @@ -188,6 +203,8 @@ pub struct SSZSnappyOutboundCodec { /// The fork name corresponding to the received context bytes. fork_name: Option, fork_context: Arc, + /// Keeps track of the current response code for a chunk. + current_response_code: Option, phantom: PhantomData, } @@ -209,66 +226,12 @@ impl SSZSnappyOutboundCodec { fork_name: None, fork_context, phantom: PhantomData, + current_response_code: None, } } -} - -// Encoder for outbound streams: Encodes RPC Requests to peers -impl Encoder> for SSZSnappyOutboundCodec { - type Error = RPCError; - - fn encode(&mut self, item: OutboundRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { - let bytes = match item { - OutboundRequest::Status(req) => req.as_ssz_bytes(), - OutboundRequest::Goodbye(req) => req.as_ssz_bytes(), - OutboundRequest::BlocksByRange(r) => match r { - OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(), - OldBlocksByRangeRequest::V2(req) => req.as_ssz_bytes(), - }, - OutboundRequest::BlocksByRoot(r) => match r { - BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(), - BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(), - }, - OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(), - OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(), - OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(), - OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(), - OutboundRequest::Ping(req) => req.as_ssz_bytes(), - OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode - }; - // SSZ encoded bytes should be within `max_packet_size` - if bytes.len() > self.max_packet_size { - return Err(RPCError::InternalError( - "attempting to encode data > max_packet_size", - )); - } - // Inserts the length prefix of the uncompressed bytes into dst - // encoded as a unsigned varint - self.inner - .encode(bytes.len(), dst) - .map_err(RPCError::from)?; - - let mut writer = FrameEncoder::new(Vec::new()); - writer.write_all(&bytes).map_err(RPCError::from)?; - writer.flush().map_err(RPCError::from)?; - - // Write compressed bytes to `dst` - dst.extend_from_slice(writer.get_ref()); - Ok(()) - } -} - -// Decoder for outbound streams: Decodes RPC responses from peers. -// -// The majority of the decoding has now been pushed upstream due to the changing specification. -// We prefer to decode blocks and attestations with extra knowledge about the chain to perform -// faster verification checks before decoding entire blocks/attestations. -impl Decoder for SSZSnappyOutboundCodec { - type Item = RPCResponse; - type Error = RPCError; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + // Decode an Rpc response. + fn decode_response(&mut self, src: &mut BytesMut) -> Result>, RPCError> { // Read the context bytes if required if self.protocol.has_context_bytes() && self.fork_name.is_none() { if src.len() >= CONTEXT_BYTES_LEN { @@ -318,15 +281,8 @@ impl Decoder for SSZSnappyOutboundCodec { Err(e) => handle_error(e, reader.get_ref().get_ref().position(), max_compressed_len), } } -} - -impl OutboundCodec> for SSZSnappyOutboundCodec { - type CodecErrorType = ErrorType; - fn decode_error( - &mut self, - src: &mut BytesMut, - ) -> Result, RPCError> { + fn decode_error(&mut self, src: &mut BytesMut) -> Result, RPCError> { let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else { return Ok(None); }; @@ -361,6 +317,95 @@ impl OutboundCodec> for SSZSnappyOutboundCodec } } +// Encoder for outbound streams: Encodes RPC Requests to peers +impl Encoder> for SSZSnappyOutboundCodec { + type Error = RPCError; + + fn encode(&mut self, item: OutboundRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + let bytes = match item { + OutboundRequest::Status(req) => req.as_ssz_bytes(), + OutboundRequest::Goodbye(req) => req.as_ssz_bytes(), + OutboundRequest::BlocksByRange(r) => match r { + OldBlocksByRangeRequest::V1(req) => req.as_ssz_bytes(), + OldBlocksByRangeRequest::V2(req) => req.as_ssz_bytes(), + }, + OutboundRequest::BlocksByRoot(r) => match r { + BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(), + BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(), + }, + OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(), + OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(), + OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(), + OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(), + OutboundRequest::Ping(req) => req.as_ssz_bytes(), + OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode + }; + // SSZ encoded bytes should be within `max_packet_size` + if bytes.len() > self.max_packet_size { + return Err(RPCError::InternalError( + "attempting to encode data > max_packet_size", + )); + } + + // Inserts the length prefix of the uncompressed bytes into dst + // encoded as a unsigned varint + self.inner + .encode(bytes.len(), dst) + .map_err(RPCError::from)?; + + let mut writer = FrameEncoder::new(Vec::new()); + writer.write_all(&bytes).map_err(RPCError::from)?; + writer.flush().map_err(RPCError::from)?; + + // Write compressed bytes to `dst` + dst.extend_from_slice(writer.get_ref()); + Ok(()) + } +} + +// Decoder for outbound streams: Decodes RPC responses from peers. +// +// The majority of the decoding has now been pushed upstream due to the changing specification. +// We prefer to decode blocks and attestations with extra knowledge about the chain to perform +// faster verification checks before decoding entire blocks/attestations. +impl Decoder for SSZSnappyOutboundCodec { + type Item = RPCCodedResponse; + type Error = RPCError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + // if we have only received the response code, wait for more bytes + if src.len() <= 1 { + return Ok(None); + } + // using the response code determine which kind of payload needs to be decoded. + let response_code = self.current_response_code.unwrap_or_else(|| { + let resp_code = src.split_to(1)[0]; + self.current_response_code = Some(resp_code); + resp_code + }); + + let inner_result = { + if RPCCodedResponse::::is_response(response_code) { + // decode an actual response and mutates the buffer if enough bytes have been read + // returning the result. + self.decode_response(src) + .map(|r| r.map(RPCCodedResponse::Success)) + } else { + // decode an error + self.decode_error(src) + .map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp))) + } + }; + // if the inner decoder was capable of decoding a chunk, we need to reset the current + // response code for the next chunk + if let Ok(Some(_)) = inner_result { + self.current_response_code = None; + } + // return the result + inner_result + } +} + /// Handle errors that we get from decoding an RPC message from the stream. /// `num_bytes_read` is the number of bytes the snappy decoder has read from the underlying stream. /// `max_compressed_len` is the maximum compressed size for a given uncompressed size. @@ -1030,7 +1075,7 @@ mod tests { let mut snappy_inbound_codec = SSZSnappyInboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); - snappy_inbound_codec.encode(message, &mut buf)?; + snappy_inbound_codec.encode_response(message, &mut buf)?; Ok(buf) } @@ -1075,7 +1120,7 @@ mod tests { let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); // decode message just as snappy message - snappy_outbound_codec.decode(message) + snappy_outbound_codec.decode_response(message) } /// Encodes the provided protocol message as bytes and tries to decode the encoding bytes. @@ -1847,4 +1892,129 @@ mod tests { RPCError::InvalidData(_) )); } + + #[test] + fn test_decode_status_message() { + let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap(); + let mut buf = BytesMut::new(); + buf.extend_from_slice(&message); + + let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); + + let fork_context = Arc::new(fork_context(ForkName::Base)); + + let chain_spec = Spec::default_spec(); + + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( + snappy_protocol_id, + max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), + fork_context, + ); + + // remove response code + let mut snappy_buf = buf.clone(); + let _ = snappy_buf.split_to(1); + + // decode message just as snappy message + let _snappy_decoded_message = snappy_outbound_codec + .decode_response(&mut snappy_buf) + .unwrap(); + + // decode message as ssz snappy chunk + let _snappy_decoded_chunk = snappy_outbound_codec.decode(&mut buf).unwrap(); + } + + #[test] + fn test_invalid_length_prefix() { + let mut uvi_codec: Uvi = Uvi::default(); + let mut dst = BytesMut::with_capacity(1024); + + // Smallest > 10 byte varint + let len: u128 = 2u128.pow(70); + + // Insert length-prefix + uvi_codec.encode(len, &mut dst).unwrap(); + + let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); + + let fork_context = Arc::new(fork_context(ForkName::Base)); + + let chain_spec = Spec::default_spec(); + + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( + snappy_protocol_id, + max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), + fork_context, + ); + + let snappy_decoded_message = snappy_outbound_codec.decode_response(&mut dst).unwrap_err(); + + assert_eq!( + snappy_decoded_message, + RPCError::IoError("input bytes exceed maximum".to_string()), + "length-prefix of > 10 bytes is invalid" + ); + } + + #[test] + fn test_length_limits() { + fn encode_len(len: usize) -> BytesMut { + let mut uvi_codec: Uvi = Uvi::default(); + let mut dst = BytesMut::with_capacity(1024); + uvi_codec.encode(len, &mut dst).unwrap(); + dst + } + + let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy); + + // Response limits + let fork_context = Arc::new(fork_context(ForkName::Base)); + + let chain_spec = Spec::default_spec(); + + let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize); + let limit = protocol_id.rpc_response_limits::(&fork_context); + let mut max = encode_len(limit.max + 1); + let mut codec = SSZSnappyOutboundCodec::::new( + protocol_id.clone(), + max_rpc_size, + fork_context.clone(), + ); + assert!(matches!( + codec.decode_response(&mut max).unwrap_err(), + RPCError::InvalidData(_) + )); + + let mut min = encode_len(limit.min - 1); + let mut codec = SSZSnappyOutboundCodec::::new( + protocol_id.clone(), + max_rpc_size, + fork_context.clone(), + ); + assert!(matches!( + codec.decode_response(&mut min).unwrap_err(), + RPCError::InvalidData(_) + )); + + // Request limits + let limit = protocol_id.rpc_request_limits(&fork_context.spec); + let mut max = encode_len(limit.max + 1); + let mut codec = SSZSnappyOutboundCodec::::new( + protocol_id.clone(), + max_rpc_size, + fork_context.clone(), + ); + assert!(matches!( + codec.decode_response(&mut max).unwrap_err(), + RPCError::InvalidData(_) + )); + + let mut min = encode_len(limit.min - 1); + let mut codec = + SSZSnappyOutboundCodec::::new(protocol_id, max_rpc_size, fork_context); + assert!(matches!( + codec.decode_response(&mut min).unwrap_err(), + RPCError::InvalidData(_) + )); + } } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/base.rs b/beacon_node/lighthouse_network/src/rpc/codec/base.rs deleted file mode 100644 index 4b9e8d50975..00000000000 --- a/beacon_node/lighthouse_network/src/rpc/codec/base.rs +++ /dev/null @@ -1,334 +0,0 @@ -//! This handles the various supported encoding mechanism for the Eth 2.0 RPC. - -use crate::rpc::methods::ErrorType; -use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse, RPCResponse}; -use libp2p::bytes::BufMut; -use libp2p::bytes::BytesMut; -use std::marker::PhantomData; -use tokio_util::codec::{Decoder, Encoder}; -use types::EthSpec; - -pub trait OutboundCodec: Encoder + Decoder { - type CodecErrorType; - - fn decode_error( - &mut self, - src: &mut BytesMut, - ) -> Result, ::Error>; -} - -/* Global Inbound Codec */ -// This deals with Decoding RPC Requests from other peers and encoding our responses - -pub struct BaseInboundCodec -where - TCodec: Encoder> + Decoder, - E: EthSpec, -{ - /// Inner codec for handling various encodings - inner: TCodec, - phantom: PhantomData, -} - -impl BaseInboundCodec -where - TCodec: Encoder> + Decoder, - E: EthSpec, -{ - pub fn new(codec: TCodec) -> Self { - BaseInboundCodec { - inner: codec, - phantom: PhantomData, - } - } -} - -/* Global Outbound Codec */ -// This deals with Decoding RPC Responses from other peers and encoding our requests -pub struct BaseOutboundCodec -where - TOutboundCodec: OutboundCodec>, - E: EthSpec, -{ - /// Inner codec for handling various encodings. - inner: TOutboundCodec, - /// Keeps track of the current response code for a chunk. - current_response_code: Option, - phantom: PhantomData, -} - -impl BaseOutboundCodec -where - E: EthSpec, - TOutboundCodec: OutboundCodec>, -{ - pub fn new(codec: TOutboundCodec) -> Self { - BaseOutboundCodec { - inner: codec, - current_response_code: None, - phantom: PhantomData, - } - } -} - -/* Implementation of the Encoding/Decoding for the global codecs */ - -/* Base Inbound Codec */ - -// This Encodes RPC Responses sent to external peers -impl Encoder> for BaseInboundCodec -where - E: EthSpec, - TCodec: Decoder + Encoder>, -{ - type Error = >>::Error; - - fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.clear(); - dst.reserve(1); - dst.put_u8( - item.as_u8() - .expect("Should never encode a stream termination"), - ); - self.inner.encode(item, dst) - } -} - -// This Decodes RPC Requests from external peers -impl Decoder for BaseInboundCodec -where - E: EthSpec, - TCodec: Encoder> + Decoder>, -{ - type Item = InboundRequest; - type Error = ::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - self.inner.decode(src) - } -} - -/* Base Outbound Codec */ - -// This Encodes RPC Requests sent to external peers -impl Encoder> for BaseOutboundCodec -where - E: EthSpec, - TCodec: OutboundCodec> + Encoder>, -{ - type Error = >>::Error; - - fn encode(&mut self, item: OutboundRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { - self.inner.encode(item, dst) - } -} - -// This decodes RPC Responses received from external peers -impl Decoder for BaseOutboundCodec -where - E: EthSpec, - TCodec: OutboundCodec, CodecErrorType = ErrorType> - + Decoder>, -{ - type Item = RPCCodedResponse; - type Error = ::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - // if we have only received the response code, wait for more bytes - if src.len() <= 1 { - return Ok(None); - } - // using the response code determine which kind of payload needs to be decoded. - let response_code = self.current_response_code.unwrap_or_else(|| { - let resp_code = src.split_to(1)[0]; - self.current_response_code = Some(resp_code); - resp_code - }); - - let inner_result = { - if RPCCodedResponse::::is_response(response_code) { - // decode an actual response and mutates the buffer if enough bytes have been read - // returning the result. - self.inner - .decode(src) - .map(|r| r.map(RPCCodedResponse::Success)) - } else { - // decode an error - self.inner - .decode_error(src) - .map(|r| r.map(|resp| RPCCodedResponse::from_error(response_code, resp))) - } - }; - // if the inner decoder was capable of decoding a chunk, we need to reset the current - // response code for the next chunk - if let Ok(Some(_)) = inner_result { - self.current_response_code = None; - } - // return the result - inner_result - } -} - -#[cfg(test)] -mod tests { - use super::super::ssz_snappy::*; - use super::*; - use crate::rpc::protocol::*; - - use std::sync::Arc; - use types::{Epoch, FixedBytesExtended, ForkContext, ForkName, Hash256, Slot}; - use unsigned_varint::codec::Uvi; - - type Spec = types::MainnetEthSpec; - - fn fork_context(fork_name: ForkName) -> ForkContext { - let mut chain_spec = Spec::default_spec(); - let altair_fork_epoch = Epoch::new(1); - let bellatrix_fork_epoch = Epoch::new(2); - let capella_fork_epoch = Epoch::new(3); - let deneb_fork_epoch = Epoch::new(4); - let electra_fork_epoch = Epoch::new(5); - - chain_spec.altair_fork_epoch = Some(altair_fork_epoch); - chain_spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch); - chain_spec.capella_fork_epoch = Some(capella_fork_epoch); - chain_spec.deneb_fork_epoch = Some(deneb_fork_epoch); - chain_spec.electra_fork_epoch = Some(electra_fork_epoch); - - let current_slot = match fork_name { - ForkName::Base => Slot::new(0), - ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()), - ForkName::Bellatrix => bellatrix_fork_epoch.start_slot(Spec::slots_per_epoch()), - ForkName::Capella => capella_fork_epoch.start_slot(Spec::slots_per_epoch()), - ForkName::Deneb => deneb_fork_epoch.start_slot(Spec::slots_per_epoch()), - ForkName::Electra => electra_fork_epoch.start_slot(Spec::slots_per_epoch()), - }; - ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) - } - - #[test] - fn test_decode_status_message() { - let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap(); - let mut buf = BytesMut::new(); - buf.extend_from_slice(&message); - - let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); - - let fork_context = Arc::new(fork_context(ForkName::Base)); - - let chain_spec = Spec::default_spec(); - - let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( - snappy_protocol_id, - max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), - fork_context, - ); - - // remove response code - let mut snappy_buf = buf.clone(); - let _ = snappy_buf.split_to(1); - - // decode message just as snappy message - let _snappy_decoded_message = snappy_outbound_codec.decode(&mut snappy_buf).unwrap(); - - // build codecs for entire chunk - let mut snappy_base_outbound_codec = BaseOutboundCodec::new(snappy_outbound_codec); - - // decode message as ssz snappy chunk - let _snappy_decoded_chunk = snappy_base_outbound_codec.decode(&mut buf).unwrap(); - } - - #[test] - fn test_invalid_length_prefix() { - let mut uvi_codec: Uvi = Uvi::default(); - let mut dst = BytesMut::with_capacity(1024); - - // Smallest > 10 byte varint - let len: u128 = 2u128.pow(70); - - // Insert length-prefix - uvi_codec.encode(len, &mut dst).unwrap(); - - let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy); - - let fork_context = Arc::new(fork_context(ForkName::Base)); - - let chain_spec = Spec::default_spec(); - - let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( - snappy_protocol_id, - max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize), - fork_context, - ); - - let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err(); - - assert_eq!( - snappy_decoded_message, - RPCError::IoError("input bytes exceed maximum".to_string()), - "length-prefix of > 10 bytes is invalid" - ); - } - - #[test] - fn test_length_limits() { - fn encode_len(len: usize) -> BytesMut { - let mut uvi_codec: Uvi = Uvi::default(); - let mut dst = BytesMut::with_capacity(1024); - uvi_codec.encode(len, &mut dst).unwrap(); - dst - } - - let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy); - - // Response limits - let fork_context = Arc::new(fork_context(ForkName::Base)); - - let chain_spec = Spec::default_spec(); - - let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize); - let limit = protocol_id.rpc_response_limits::(&fork_context); - let mut max = encode_len(limit.max + 1); - let mut codec = SSZSnappyOutboundCodec::::new( - protocol_id.clone(), - max_rpc_size, - fork_context.clone(), - ); - assert!(matches!( - codec.decode(&mut max).unwrap_err(), - RPCError::InvalidData(_) - )); - - let mut min = encode_len(limit.min - 1); - let mut codec = SSZSnappyOutboundCodec::::new( - protocol_id.clone(), - max_rpc_size, - fork_context.clone(), - ); - assert!(matches!( - codec.decode(&mut min).unwrap_err(), - RPCError::InvalidData(_) - )); - - // Request limits - let limit = protocol_id.rpc_request_limits(&fork_context.spec); - let mut max = encode_len(limit.max + 1); - let mut codec = SSZSnappyOutboundCodec::::new( - protocol_id.clone(), - max_rpc_size, - fork_context.clone(), - ); - assert!(matches!( - codec.decode(&mut max).unwrap_err(), - RPCError::InvalidData(_) - )); - - let mut min = encode_len(limit.min - 1); - let mut codec = - SSZSnappyOutboundCodec::::new(protocol_id, max_rpc_size, fork_context); - assert!(matches!( - codec.decode(&mut min).unwrap_err(), - RPCError::InvalidData(_) - )); - } -} diff --git a/beacon_node/lighthouse_network/src/rpc/codec/mod.rs b/beacon_node/lighthouse_network/src/rpc/codec/mod.rs deleted file mode 100644 index dbe99af5bfb..00000000000 --- a/beacon_node/lighthouse_network/src/rpc/codec/mod.rs +++ /dev/null @@ -1,61 +0,0 @@ -pub(crate) mod base; -pub(crate) mod ssz_snappy; - -use self::base::{BaseInboundCodec, BaseOutboundCodec}; -use self::ssz_snappy::{SSZSnappyInboundCodec, SSZSnappyOutboundCodec}; -use crate::rpc::protocol::RPCError; -use crate::rpc::{InboundRequest, OutboundRequest, RPCCodedResponse}; -use libp2p::bytes::BytesMut; -use tokio_util::codec::{Decoder, Encoder}; -use types::EthSpec; - -// Known types of codecs -pub enum InboundCodec { - SSZSnappy(BaseInboundCodec, E>), -} - -pub enum OutboundCodec { - SSZSnappy(BaseOutboundCodec, E>), -} - -impl Encoder> for InboundCodec { - type Error = RPCError; - - fn encode(&mut self, item: RPCCodedResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { - match self { - InboundCodec::SSZSnappy(codec) => codec.encode(item, dst), - } - } -} - -impl Decoder for InboundCodec { - type Item = InboundRequest; - type Error = RPCError; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - match self { - InboundCodec::SSZSnappy(codec) => codec.decode(src), - } - } -} - -impl Encoder> for OutboundCodec { - type Error = RPCError; - - fn encode(&mut self, item: OutboundRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { - match self { - OutboundCodec::SSZSnappy(codec) => codec.encode(item, dst), - } - } -} - -impl Decoder for OutboundCodec { - type Item = RPCCodedResponse; - type Error = RPCError; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - match self { - OutboundCodec::SSZSnappy(codec) => codec.decode(src), - } - } -} diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index c67c7865ea3..2bfa42ccac9 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -2,9 +2,7 @@ use super::methods::*; use super::protocol::ProtocolId; use super::protocol::SupportedProtocol; use super::RPCError; -use crate::rpc::codec::{ - base::BaseOutboundCodec, ssz_snappy::SSZSnappyOutboundCodec, OutboundCodec, -}; +use crate::rpc::codec::SSZSnappyOutboundCodec; use crate::rpc::protocol::Encoding; use futures::future::BoxFuture; use futures::prelude::{AsyncRead, AsyncWrite}; @@ -183,7 +181,7 @@ impl OutboundRequest { /* Outbound upgrades */ -pub type OutboundFramed = Framed, OutboundCodec>; +pub type OutboundFramed = Framed, SSZSnappyOutboundCodec>; impl OutboundUpgrade for OutboundRequestContainer where @@ -199,12 +197,7 @@ where let socket = socket.compat(); let codec = match protocol.encoding { Encoding::SSZSnappy => { - let ssz_snappy_codec = BaseOutboundCodec::new(SSZSnappyOutboundCodec::new( - protocol, - self.max_rpc_size, - self.fork_context.clone(), - )); - OutboundCodec::SSZSnappy(ssz_snappy_codec) + SSZSnappyOutboundCodec::new(protocol, self.max_rpc_size, self.fork_context.clone()) } }; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index f4bdf6450b8..09a18e5de6b 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -1,5 +1,5 @@ use super::methods::*; -use crate::rpc::codec::{base::BaseInboundCodec, ssz_snappy::SSZSnappyInboundCodec, InboundCodec}; +use crate::rpc::codec::SSZSnappyInboundCodec; use futures::future::BoxFuture; use futures::prelude::{AsyncRead, AsyncWrite}; use futures::{FutureExt, StreamExt}; @@ -647,7 +647,7 @@ pub fn rpc_data_column_limits() -> RpcLimits { pub type InboundOutput = (InboundRequest, InboundFramed); pub type InboundFramed = - Framed>>>, InboundCodec>; + Framed>>>, SSZSnappyInboundCodec>; impl InboundUpgrade for RPCProtocol where @@ -664,15 +664,13 @@ where // convert the socket to tokio compatible socket let socket = socket.compat(); let codec = match protocol.encoding { - Encoding::SSZSnappy => { - let ssz_snappy_codec = BaseInboundCodec::new(SSZSnappyInboundCodec::new( - protocol, - self.max_rpc_size, - self.fork_context.clone(), - )); - InboundCodec::SSZSnappy(ssz_snappy_codec) - } + Encoding::SSZSnappy => SSZSnappyInboundCodec::new( + protocol, + self.max_rpc_size, + self.fork_context.clone(), + ), }; + let mut timed_socket = TimeoutStream::new(socket); timed_socket.set_read_timeout(Some(self.ttfb_timeout)); diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index a95912ff060..d97b52f79f1 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1809,6 +1809,7 @@ impl Network { self.inject_upnp_event(e); None } + #[allow(unreachable_patterns)] BehaviourEvent::ConnectionLimits(le) => void::unreachable(le), }, SwarmEvent::ConnectionEstablished { .. } => None, diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index c76e0a18577..ac78e2cb01e 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -2,9 +2,10 @@ use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV3}; use crate::types::{BackFillState, SyncState}; +use crate::Client; use crate::EnrExt; -use crate::{Client, Eth2Enr}; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; +use itertools::Itertools; use parking_lot::RwLock; use std::collections::HashSet; use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec}; @@ -26,6 +27,9 @@ pub struct NetworkGlobals { pub sync_state: RwLock, /// The current state of the backfill sync. pub backfill_state: RwLock, + /// The computed custody subnets and columns is stored to avoid re-computing. + pub custody_subnets: Vec, + pub custody_columns: Vec, pub spec: ChainSpec, } @@ -38,20 +42,39 @@ impl NetworkGlobals { log: &slog::Logger, spec: ChainSpec, ) -> Self { + let (custody_subnets, custody_columns) = if spec.is_peer_das_scheduled() { + let custody_subnet_count = local_metadata + .custody_subnet_count() + .copied() + .expect("custody subnet count must be set if PeerDAS is scheduled"); + let custody_subnets = DataColumnSubnetId::compute_custody_subnets::( + enr.node_id().raw(), + custody_subnet_count, + &spec, + ) + .expect("custody subnet count must be valid") + .collect::>(); + let custody_columns = custody_subnets + .iter() + .flat_map(|subnet| subnet.columns::(&spec)) + .sorted() + .collect(); + (custody_subnets, custody_columns) + } else { + (vec![], vec![]) + }; + NetworkGlobals { local_enr: RwLock::new(enr.clone()), peer_id: RwLock::new(enr.peer_id()), listen_multiaddrs: RwLock::new(Vec::new()), local_metadata: RwLock::new(local_metadata), - peers: RwLock::new(PeerDB::new( - trusted_peers, - disable_peer_scoring, - log, - spec.clone(), - )), + peers: RwLock::new(PeerDB::new(trusted_peers, disable_peer_scoring, log)), gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::NotRequired), + custody_subnets, + custody_columns, spec, } } @@ -118,29 +141,6 @@ impl NetworkGlobals { std::mem::replace(&mut *self.sync_state.write(), new_state) } - /// Compute custody data columns the node is assigned to custody. - pub fn custody_columns(&self) -> Vec { - let enr = self.local_enr(); - let custody_subnet_count = enr.custody_subnet_count::(&self.spec); - DataColumnSubnetId::compute_custody_columns::( - enr.node_id().raw(), - custody_subnet_count, - &self.spec, - ) - .collect() - } - - /// Compute custody data column subnets the node is assigned to custody. - pub fn custody_subnets(&self) -> impl Iterator { - let enr = self.local_enr(); - let custody_subnet_count = enr.custody_subnet_count::(&self.spec); - DataColumnSubnetId::compute_custody_subnets::( - enr.node_id().raw(), - custody_subnet_count, - &self.spec, - ) - } - /// Returns a connected peer that: /// 1. is connected /// 2. assigned to custody the column based on it's `custody_subnet_count` from ENR or metadata @@ -161,44 +161,70 @@ impl NetworkGlobals { trusted_peers: Vec, log: &slog::Logger, spec: ChainSpec, + ) -> NetworkGlobals { + let metadata = MetaData::V3(MetaDataV3 { + seq_number: 0, + attnets: Default::default(), + syncnets: Default::default(), + custody_subnet_count: spec.custody_requirement, + }); + Self::new_test_globals_with_metadata(trusted_peers, metadata, log, spec) + } + + pub(crate) fn new_test_globals_with_metadata( + trusted_peers: Vec, + metadata: MetaData, + log: &slog::Logger, + spec: ChainSpec, ) -> NetworkGlobals { use crate::CombinedKeyExt; let keypair = libp2p::identity::secp256k1::Keypair::generate(); let enr_key: discv5::enr::CombinedKey = discv5::enr::CombinedKey::from_secp256k1(&keypair); let enr = discv5::enr::Enr::builder().build(&enr_key).unwrap(); - NetworkGlobals::new( - enr, - MetaData::V3(MetaDataV3 { - seq_number: 0, - attnets: Default::default(), - syncnets: Default::default(), - custody_subnet_count: spec.data_column_sidecar_subnet_count, - }), - trusted_peers, - false, - log, - spec, - ) + NetworkGlobals::new(enr, metadata, trusted_peers, false, log, spec) } } #[cfg(test)] mod test { use super::*; - use types::{EthSpec, MainnetEthSpec as E}; + use types::{Epoch, EthSpec, MainnetEthSpec as E}; + + #[test] + fn test_custody_subnets() { + let log = logging::test_logger(); + let mut spec = E::default_spec(); + spec.eip7594_fork_epoch = Some(Epoch::new(0)); + + let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2; + let metadata = get_metadata(custody_subnet_count); + + let globals = + NetworkGlobals::::new_test_globals_with_metadata(vec![], metadata, &log, spec); + assert_eq!(globals.custody_subnets.len(), custody_subnet_count as usize); + } #[test] - fn test_custody_count_default() { - let spec = E::default_spec(); + fn test_custody_columns() { let log = logging::test_logger(); - let default_custody_requirement_column_count = spec.number_of_columns as u64 - / spec.data_column_sidecar_subnet_count - * spec.custody_requirement; - let globals = NetworkGlobals::::new_test_globals(vec![], &log, spec.clone()); - let columns = globals.custody_columns(); - assert_eq!( - columns.len(), - default_custody_requirement_column_count as usize - ); + let mut spec = E::default_spec(); + spec.eip7594_fork_epoch = Some(Epoch::new(0)); + + let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2; + let custody_columns_count = spec.number_of_columns / 2; + let metadata = get_metadata(custody_subnet_count); + + let globals = + NetworkGlobals::::new_test_globals_with_metadata(vec![], metadata, &log, spec); + assert_eq!(globals.custody_columns.len(), custody_columns_count); + } + + fn get_metadata(custody_subnet_count: u64) -> MetaData { + MetaData::V3(MetaDataV3 { + seq_number: 0, + attnets: Default::default(), + syncnets: Default::default(), + custody_subnet_count, + }) } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 5b9a3125ea5..5782fb00b6c 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -16,7 +16,6 @@ use futures::prelude::*; use futures::StreamExt; use lighthouse_network::service::Network; use lighthouse_network::types::GossipKind; -use lighthouse_network::Eth2Enr; use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance}; use lighthouse_network::{ rpc::{GoodbyeReason, RPCResponseErrorCode}, @@ -808,17 +807,9 @@ impl NetworkService { } } } else { - for column_subnet in DataColumnSubnetId::compute_custody_subnets::( - self.network_globals.local_enr().node_id().raw(), - self.network_globals - .local_enr() - .custody_subnet_count::<::EthSpec>( - &self.fork_context.spec, - ), - &self.fork_context.spec, - ) { + for column_subnet in &self.network_globals.custody_subnets { for fork_digest in self.required_gossip_fork_digests() { - let gossip_kind = Subnet::DataColumn(column_subnet).into(); + let gossip_kind = Subnet::DataColumn(*column_subnet).into(); let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest); if self.libp2p.subscribe(topic.clone()) { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 4ae55d5aafe..73ffcd43845 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,4 +1,3 @@ -use super::common::ResponseType; use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::common::RequestState; use crate::sync::network_context::{ @@ -188,7 +187,6 @@ impl SingleBlockLookup { .state .peek_downloaded_data() .cloned(); - let block_is_processed = self.block_request_state.state.is_processed(); let request = R::request_state_mut(self); // Attempt to progress awaiting downloads @@ -241,12 +239,7 @@ impl SingleBlockLookup { // Otherwise, attempt to progress awaiting processing // If this request is awaiting a parent lookup to be processed, do not send for processing. // The request will be rejected with unknown parent error. - // - // TODO: The condition `block_is_processed || Block` can be dropped after checking for - // unknown parent root when import RPC blobs - } else if !awaiting_parent - && (block_is_processed || matches!(R::response_type(), ResponseType::Block)) - { + } else if !awaiting_parent { // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is // useful to conditionally access the result data. if let Some(result) = request.get_state_mut().maybe_start_processing() { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 1cf028dbcd8..b9f6d180c13 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -389,7 +389,7 @@ impl SyncNetworkContext { let (expects_custody_columns, num_of_custody_column_req) = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let custody_indexes = self.network_globals().custody_columns(); + let custody_indexes = self.network_globals().custody_columns.clone(); let mut num_of_custody_column_req = 0; for (peer_id, columns_by_range_request) in @@ -758,10 +758,11 @@ impl SyncNetworkContext { .imported_custody_column_indexes(&block_root) .unwrap_or_default(); - let custody_indexes_duty = self.network_globals().custody_columns(); - // Include only the blob indexes not yet imported (received through gossip) - let custody_indexes_to_fetch = custody_indexes_duty + let custody_indexes_to_fetch = self + .network_globals() + .custody_columns + .clone() .into_iter() .filter(|index| !custody_indexes_imported.contains(index)) .collect::>(); diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 1756fb513da..ed5946ada72 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1112,25 +1112,25 @@ impl SyncingChain { fn good_peers_on_custody_subnets(&self, epoch: Epoch, network: &SyncNetworkContext) -> bool { if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { // Require peers on all custody column subnets before sending batches - let peers_on_all_custody_subnets = - network - .network_globals() - .custody_subnets() - .all(|subnet_id| { - let peer_count = network - .network_globals() - .peers - .read() - .good_custody_subnet_peer(subnet_id) - .count(); - - set_int_gauge( - &PEERS_PER_COLUMN_SUBNET, - &[&subnet_id.to_string()], - peer_count as i64, - ); - peer_count > 0 - }); + let peers_on_all_custody_subnets = network + .network_globals() + .custody_subnets + .iter() + .all(|subnet_id| { + let peer_count = network + .network_globals() + .peers + .read() + .good_custody_subnet_peer(*subnet_id) + .count(); + + set_int_gauge( + &PEERS_PER_COLUMN_SUBNET, + &[&subnet_id.to_string()], + peer_count as i64, + ); + peer_count > 0 + }); peers_on_all_custody_subnets } else { true diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 67bc9d7d407..1e9611fd1eb 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -4,6 +4,7 @@ use clap::{builder::ArgPredicate, crate_version, Arg, ArgAction, ArgGroup, Comma use clap_utils::{get_color_style, FLAG_HEADER}; use strum::VariantNames; +#[allow(clippy::large_stack_frames)] pub fn cli_app() -> Command { Command::new("beacon_node") .display_order(0) diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 28e04f56205..720afd0f3f6 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -98,14 +98,13 @@ impl KeyValueStore for LevelDB { .get(self.read_options(), BytesKey::from_vec(column_key)) .map_err(Into::into) .map(|opt| { - opt.map(|bytes| { + opt.inspect(|bytes| { metrics::inc_counter_vec_by( &metrics::DISK_DB_READ_BYTES, &[col], bytes.len() as u64, ); metrics::stop_timer(timer); - bytes }) }) } diff --git a/common/account_utils/src/lib.rs b/common/account_utils/src/lib.rs index 665953fa522..2c8bbbf4b4e 100644 --- a/common/account_utils/src/lib.rs +++ b/common/account_utils/src/lib.rs @@ -228,7 +228,7 @@ impl ZeroizeString { /// Remove any number of newline or carriage returns from the end of a vector of bytes. pub fn without_newlines(&self) -> ZeroizeString { - let stripped_string = self.0.trim_end_matches(|c| c == '\r' || c == '\n').into(); + let stripped_string = self.0.trim_end_matches(['\r', '\n']).into(); Self(stripped_string) } } diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index fa8f47e364a..f52913dd001 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -400,3 +400,31 @@ pub fn decimal_buckets(min_power: i32, max_power: i32) -> Result> { } Ok(buckets) } + +/// Would be nice to use the `Try` trait bound and have a single implementation, but try_trait_v2 +/// is not a stable feature yet. +pub trait TryExt { + fn discard_timer_on_break(self, timer: &mut Option) -> Self; +} + +impl TryExt for std::result::Result { + fn discard_timer_on_break(self, timer_opt: &mut Option) -> Self { + if self.is_err() { + if let Some(timer) = timer_opt.take() { + timer.stop_and_discard(); + } + } + self + } +} + +impl TryExt for Option { + fn discard_timer_on_break(self, timer_opt: &mut Option) -> Self { + if self.is_none() { + if let Some(timer) = timer_opt.take() { + timer.stop_and_discard(); + } + } + self + } +} diff --git a/common/logging/src/lib.rs b/common/logging/src/lib.rs index d3d91497ccb..a4a1acabd48 100644 --- a/common/logging/src/lib.rs +++ b/common/logging/src/lib.rs @@ -100,10 +100,7 @@ impl<'a> AlignedRecordDecorator<'a> { self.ignore_comma = false; Ok(buf.len()) } else if self.message_active { - self.wrapped.write(buf).map(|n| { - self.message_count += n; - n - }) + self.wrapped.write(buf).inspect(|n| self.message_count += n) } else { self.wrapped.write(buf) } diff --git a/common/malloc_utils/Cargo.toml b/common/malloc_utils/Cargo.toml index ac309cec9da..b91e68c518e 100644 --- a/common/malloc_utils/Cargo.toml +++ b/common/malloc_utils/Cargo.toml @@ -8,16 +8,19 @@ edition = { workspace = true } lighthouse_metrics = { workspace = true } libc = "0.2.79" parking_lot = { workspace = true } -jemalloc-ctl = { version = "0.5.0", optional = true } +tikv-jemalloc-ctl = { version = "0.6.0", optional = true, features = ["stats"] } # Jemalloc's background_threads feature requires Linux (pthreads). [target.'cfg(target_os = "linux")'.dependencies] -jemallocator = { version = "0.5.0", optional = true, features = ["stats", "background_threads"] } +tikv-jemallocator = { version = "0.6.0", optional = true, features = [ + "stats", + "background_threads", +] } [target.'cfg(not(target_os = "linux"))'.dependencies] -jemallocator = { version = "0.5.0", optional = true, features = ["stats"] } +tikv-jemallocator = { version = "0.6.0", optional = true, features = ["stats"] } [features] mallinfo2 = [] -jemalloc = ["jemallocator", "jemalloc-ctl"] -jemalloc-profiling = ["jemallocator/profiling"] +jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] +jemalloc-profiling = ["tikv-jemallocator/profiling"] diff --git a/common/malloc_utils/src/jemalloc.rs b/common/malloc_utils/src/jemalloc.rs index 70685d59607..a392a74e8f1 100644 --- a/common/malloc_utils/src/jemalloc.rs +++ b/common/malloc_utils/src/jemalloc.rs @@ -7,12 +7,12 @@ //! //! A) `JEMALLOC_SYS_WITH_MALLOC_CONF` at compile-time. //! B) `_RJEM_MALLOC_CONF` at runtime. -use jemalloc_ctl::{arenas, epoch, stats, Error}; use lighthouse_metrics::{set_gauge, try_create_int_gauge, IntGauge}; use std::sync::LazyLock; +use tikv_jemalloc_ctl::{arenas, epoch, stats, Error}; #[global_allocator] -static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; +static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; // Metrics for jemalloc. pub static NUM_ARENAS: LazyLock> = diff --git a/common/warp_utils/src/reject.rs b/common/warp_utils/src/reject.rs index 9b28c65212c..bbd5274a7eb 100644 --- a/common/warp_utils/src/reject.rs +++ b/common/warp_utils/src/reject.rs @@ -265,6 +265,8 @@ pub async fn convert_rejection(res: Result) -> Res Ok(response) => response.into_response(), Err(e) => match handle_rejection(e).await { Ok(reply) => reply.into_response(), + // We can simplify this once Rust 1.82 is MSRV + #[allow(unreachable_patterns)] Err(_) => warp::reply::with_status( warp::reply::json(&"unhandled error"), eth2::StatusCode::INTERNAL_SERVER_ERROR, diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index df964cf8de7..df61d711c19 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -41,9 +41,10 @@ impl DataColumnSubnetId { raw_node_id: [u8; 32], custody_subnet_count: u64, spec: &ChainSpec, - ) -> impl Iterator { - // TODO(das): we could perform check on `custody_subnet_count` here to ensure that it is a valid - // value, but here we assume it is valid. + ) -> Result, Error> { + if custody_subnet_count > spec.data_column_sidecar_subnet_count { + return Err(Error::InvalidCustodySubnetCount(custody_subnet_count)); + } let mut subnets: HashSet = HashSet::new(); let mut current_id = U256::from_be_slice(&raw_node_id); @@ -66,17 +67,26 @@ impl DataColumnSubnetId { } current_id += U256::from(1u64) } - subnets.into_iter().map(DataColumnSubnetId::new) + Ok(subnets.into_iter().map(DataColumnSubnetId::new)) + } + + /// Compute the custody subnets for a given node id with the default `custody_requirement`. + /// This operation should be infallable, and empty iterator is returned if it fails unexpectedly. + pub fn compute_custody_requirement_subnets( + node_id: [u8; 32], + spec: &ChainSpec, + ) -> impl Iterator { + Self::compute_custody_subnets::(node_id, spec.custody_requirement, spec) + .expect("should compute default custody subnets") } pub fn compute_custody_columns( raw_node_id: [u8; 32], custody_subnet_count: u64, spec: &ChainSpec, - ) -> impl Iterator { + ) -> Result, Error> { Self::compute_custody_subnets::(raw_node_id, custody_subnet_count, spec) - .flat_map(|subnet| subnet.columns::(spec)) - .sorted() + .map(|subnet| subnet.flat_map(|subnet| subnet.columns::(spec)).sorted()) } } @@ -121,6 +131,7 @@ impl From<&DataColumnSubnetId> for u64 { #[derive(Debug)] pub enum Error { ArithError(ArithError), + InvalidCustodySubnetCount(u64), } impl From for Error { @@ -132,9 +143,9 @@ impl From for Error { #[cfg(test)] mod test { use crate::data_column_subnet_id::DataColumnSubnetId; - use crate::EthSpec; use crate::MainnetEthSpec; use crate::Uint256; + use crate::{EthSpec, GnosisEthSpec, MinimalEthSpec}; type E = MainnetEthSpec; @@ -163,7 +174,8 @@ mod test { node_id, custody_requirement, &spec, - ); + ) + .unwrap(); let computed_subnets: Vec<_> = computed_subnets.collect(); // the number of subnets is equal to the custody requirement @@ -183,6 +195,21 @@ mod test { } } + #[test] + fn test_compute_custody_requirement_subnets_never_panics() { + let node_id = [1u8; 32]; + test_compute_custody_requirement_subnets_with_spec::(node_id); + test_compute_custody_requirement_subnets_with_spec::(node_id); + test_compute_custody_requirement_subnets_with_spec::(node_id); + } + + fn test_compute_custody_requirement_subnets_with_spec(node_id: [u8; 32]) { + let _ = DataColumnSubnetId::compute_custody_requirement_subnets::( + node_id, + &E::default_spec(), + ); + } + #[test] fn test_columns_subnet_conversion() { let spec = E::default_spec(); diff --git a/testing/ef_tests/src/cases/get_custody_columns.rs b/testing/ef_tests/src/cases/get_custody_columns.rs index d31e72a473d..9665f877300 100644 --- a/testing/ef_tests/src/cases/get_custody_columns.rs +++ b/testing/ef_tests/src/cases/get_custody_columns.rs @@ -31,6 +31,7 @@ impl Case for GetCustodyColumns { self.custody_subnet_count, &spec, ) + .expect("should compute custody columns") .collect::>(); let expected = &self.result; if computed == *expected { diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index f3f5a72cb60..0289fd4206b 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -649,15 +649,7 @@ async fn check_payload_reconstruction( ee: &ExecutionPair, payload: &ExecutionPayload, ) { - // check via legacy eth_getBlockByHash - let reconstructed = ee - .execution_layer - .get_payload_by_hash_legacy(payload.block_hash(), payload.fork_name()) - .await - .unwrap() - .unwrap(); - assert_eq!(reconstructed, *payload); - // also check via payload bodies method + // check via payload bodies method let capabilities = ee .execution_layer .get_engine_capabilities(None) diff --git a/testing/simulator/src/basic_sim.rs b/testing/simulator/src/basic_sim.rs index 46196ba2b10..16badaffc2d 100644 --- a/testing/simulator/src/basic_sim.rs +++ b/testing/simulator/src/basic_sim.rs @@ -26,6 +26,7 @@ const DENEB_FORK_EPOCH: u64 = 2; const SUGGESTED_FEE_RECIPIENT: [u8; 20] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]; +#[allow(clippy::large_stack_frames)] pub fn run_basic_sim(matches: &ArgMatches) -> Result<(), String> { let node_count = matches .get_one::("nodes") diff --git a/testing/state_transition_vectors/src/macros.rs b/testing/state_transition_vectors/src/macros.rs index 5dafbf549a0..a7f87b1c26e 100644 --- a/testing/state_transition_vectors/src/macros.rs +++ b/testing/state_transition_vectors/src/macros.rs @@ -4,6 +4,7 @@ /// - `mod tests`: runs all the test vectors locally. macro_rules! vectors_and_tests { ($($name: ident, $test: expr),*) => { + #[allow(clippy::large_stack_frames)] pub async fn vectors() -> Vec { let mut vec = vec![]; diff --git a/watch/tests/tests.rs b/watch/tests/tests.rs index 5461508edd8..e21cf151b11 100644 --- a/watch/tests/tests.rs +++ b/watch/tests/tests.rs @@ -852,6 +852,7 @@ async fn chain_grows() { #[cfg(unix)] #[tokio::test] +#[allow(clippy::large_stack_frames)] async fn chain_grows_with_metadata() { let builder = TesterBuilder::new().await; @@ -959,6 +960,7 @@ async fn chain_grows_with_metadata() { #[cfg(unix)] #[tokio::test] +#[allow(clippy::large_stack_frames)] async fn chain_grows_with_metadata_and_multiple_skip_slots() { let builder = TesterBuilder::new().await;