From 4bfca8251dff0ceb622fea99bead74eeb1a2b3d8 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 15 Jul 2024 20:51:59 +0200 Subject: [PATCH] Add range sync metrics to track efficiency (#6095) * Add more range sync metrics to track efficiency * Add ignored blocks metrics --- beacon_node/network/src/metrics.rs | 30 +++++++++ .../network_beacon_processor/sync_methods.rs | 14 +++-- .../network/src/sync/backfill_sync/mod.rs | 10 +-- beacon_node/network/src/sync/manager.rs | 5 +- .../network/src/sync/range_sync/batch.rs | 28 ++++++--- .../network/src/sync/range_sync/chain.rs | 62 +++++++++++++++++-- .../src/sync/range_sync/chain_collection.rs | 23 +++++-- .../network/src/sync/range_sync/range.rs | 7 +++ 8 files changed, 151 insertions(+), 28 deletions(-) diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index dbcc8fb9b4c..f0dba8d9655 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -237,6 +237,36 @@ lazy_static! { "Number of Syncing chains in range, per range type", &["range_type"] ); + pub static ref SYNCING_CHAINS_REMOVED: Result = try_create_int_counter_vec( + "sync_range_removed_chains_total", + "Total count of range syncing chains removed per range type", + &["range_type"] + ); + pub static ref SYNCING_CHAINS_ADDED: Result = try_create_int_counter_vec( + "sync_range_added_chains_total", + "Total count of range syncing chains added per range type", + &["range_type"] + ); + pub static ref SYNCING_CHAINS_DROPPED_BLOCKS: Result = try_create_int_counter_vec( + "sync_range_chains_dropped_blocks_total", + "Total count of dropped blocks when removing a syncing chain per range type", + &["range_type"] + ); + pub static ref SYNCING_CHAINS_IGNORED_BLOCKS: Result = try_create_int_counter_vec( + "sync_range_chains_ignored_blocks_total", + "Total count of ignored blocks when processing a syncing chain batch per chain type", + &["chain_type"] + ); + pub static ref SYNCING_CHAINS_PROCESSED_BATCHES: Result = try_create_int_counter_vec( + "sync_range_chains_processed_batches_total", + "Total count of processed batches in a syncing chain batch per chain type", + &["chain_type"] + ); + pub static ref SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: Result = try_create_histogram_with_buckets( + "sync_range_chain_batch_awaiting_processing_seconds", + "Time range sync batches spend in AwaitingProcessing state", + Ok(vec![0.01,0.02,0.05,0.1,0.2,0.5,1.0,2.0,5.0,10.0,20.0]) + ); pub static ref SYNC_SINGLE_BLOCK_LOOKUPS: Result = try_create_int_gauge( "sync_single_block_lookups", "Number of single block lookups underway" diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index acd02ab6ad8..68bd6745144 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -326,7 +326,7 @@ impl NetworkBeaconProcessor { .process_blocks(downloaded_blocks.iter(), notify_execution_layer) .await { - (_, Ok(_)) => { + (imported_blocks, Ok(_)) => { debug!(self.log, "Batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, @@ -335,7 +335,8 @@ impl NetworkBeaconProcessor { "processed_blocks" => sent_blocks, "service"=> "sync"); BatchProcessResult::Success { - was_non_empty: sent_blocks > 0, + sent_blocks, + imported_blocks, } } (imported_blocks, Err(e)) => { @@ -349,7 +350,7 @@ impl NetworkBeaconProcessor { "service" => "sync"); match e.peer_action { Some(penalty) => BatchProcessResult::FaultyFailure { - imported_blocks: imported_blocks > 0, + imported_blocks, penalty, }, None => BatchProcessResult::NonFaultyFailure, @@ -368,7 +369,7 @@ impl NetworkBeaconProcessor { .sum::(); match self.process_backfill_blocks(downloaded_blocks) { - (_, Ok(_)) => { + (imported_blocks, Ok(_)) => { debug!(self.log, "Backfill batch processed"; "batch_epoch" => epoch, "first_block_slot" => start_slot, @@ -377,7 +378,8 @@ impl NetworkBeaconProcessor { "processed_blobs" => n_blobs, "service"=> "sync"); BatchProcessResult::Success { - was_non_empty: sent_blocks > 0, + sent_blocks, + imported_blocks, } } (_, Err(e)) => { @@ -390,7 +392,7 @@ impl NetworkBeaconProcessor { "service" => "sync"); match e.peer_action { Some(penalty) => BatchProcessResult::FaultyFailure { - imported_blocks: false, + imported_blocks: 0, penalty, }, None => BatchProcessResult::NonFaultyFailure, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 8bcc95fd3ce..dfb05da19bd 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -528,7 +528,7 @@ impl BackFillSync { // result callback. This is done, because an empty batch could end a chain and the logic // for removing chains and checking completion is in the callback. - let blocks = match batch.start_processing() { + let (blocks, _) = match batch.start_processing() { Err(e) => { return self .fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)) @@ -615,13 +615,15 @@ impl BackFillSync { "batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer)); match result { - BatchProcessResult::Success { was_non_empty } => { + BatchProcessResult::Success { + imported_blocks, .. + } => { if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) { self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?; } // If the processed batch was not empty, we can validate previous unvalidated // blocks. - if *was_non_empty { + if *imported_blocks > 0 { self.advance_chain(network, batch_id); } @@ -677,7 +679,7 @@ impl BackFillSync { Ok(BatchOperationOutcome::Continue) => { // chain can continue. Check if it can be progressed - if *imported_blocks { + if *imported_blocks > 0 { // At least one block was successfully verified and imported, then we can be sure all // previous batches are valid and we only need to download the current failed // batch. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 23c05a6e161..dd4fa56d537 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -156,11 +156,12 @@ pub enum BlockProcessingResult { pub enum BatchProcessResult { /// The batch was completed successfully. It carries whether the sent batch contained blocks. Success { - was_non_empty: bool, + sent_blocks: usize, + imported_blocks: usize, }, /// The batch processing failed. It carries whether the processing imported any block. FaultyFailure { - imported_blocks: bool, + imported_blocks: usize, penalty: PeerAction, }, NonFaultyFailure, diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 6e377cc6cb4..49e3ac3a817 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -5,6 +5,7 @@ use lighthouse_network::PeerId; use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::ops::Sub; +use std::time::{Duration, Instant}; use strum::Display; use types::{Epoch, EthSpec, Slot}; @@ -118,7 +119,7 @@ pub enum BatchState { /// The batch is being downloaded. Downloading(PeerId, Id), /// The batch has been completely downloaded and is ready for processing. - AwaitingProcessing(PeerId, Vec>), + AwaitingProcessing(PeerId, Vec>, Instant), /// The batch is being processed. Processing(Attempt), /// The batch was successfully processed and is waiting to be validated. @@ -210,13 +211,26 @@ impl BatchInfo { match &self.state { BatchState::AwaitingDownload | BatchState::Failed => None, BatchState::Downloading(peer_id, _) - | BatchState::AwaitingProcessing(peer_id, _) + | BatchState::AwaitingProcessing(peer_id, _, _) | BatchState::Processing(Attempt { peer_id, .. }) | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), BatchState::Poisoned => unreachable!("Poisoned batch"), } } + /// Returns the count of stored pending blocks if in awaiting processing state + pub fn pending_blocks(&self) -> usize { + match &self.state { + BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(), + BatchState::AwaitingDownload + | BatchState::Downloading { .. } + | BatchState::Processing { .. } + | BatchState::AwaitingValidation { .. } + | BatchState::Poisoned + | BatchState::Failed => 0, + } + } + /// Returns a BlocksByRange request associated with the batch. pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) { ( @@ -293,7 +307,7 @@ impl BatchInfo { } let received = blocks.len(); - self.state = BatchState::AwaitingProcessing(peer, blocks); + self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now()); Ok(received) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -365,11 +379,11 @@ impl BatchInfo { } } - pub fn start_processing(&mut self) -> Result>, WrongState> { + pub fn start_processing(&mut self) -> Result<(Vec>, Duration), WrongState> { match self.state.poison() { - BatchState::AwaitingProcessing(peer, blocks) => { + BatchState::AwaitingProcessing(peer, blocks, start_instant) => { self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); - Ok(blocks) + Ok((blocks, start_instant.elapsed())) } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -515,7 +529,7 @@ impl std::fmt::Debug for BatchState { }) => write!(f, "AwaitingValidation({})", peer_id), BatchState::AwaitingDownload => f.write_str("AwaitingDownload"), BatchState::Failed => f.write_str("Failed"), - BatchState::AwaitingProcessing(ref peer, ref blocks) => { + BatchState::AwaitingProcessing(ref peer, ref blocks, _) => { write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len()) } BatchState::Downloading(peer, request_id) => { diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index a735001fed3..d63b2f95d80 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,4 +1,6 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; +use super::RangeSyncType; +use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::network_context::RangeRequestId; use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult}; @@ -11,6 +13,7 @@ use rand::{seq::SliceRandom, Rng}; use slog::{crit, debug, o, warn}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::hash::{Hash, Hasher}; +use strum::IntoStaticStr; use types::{Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of @@ -53,6 +56,13 @@ pub struct KeepChain; pub type ChainId = u64; pub type BatchId = Epoch; +#[derive(Debug, Copy, Clone, IntoStaticStr)] +pub enum SyncingChainType { + Head, + Finalized, + Backfill, +} + /// A chain of blocks that need to be downloaded. Peers who claim to contain the target head /// root are grouped into the peer pool and queried for batches when downloading the /// chain. @@ -60,6 +70,9 @@ pub struct SyncingChain { /// A random id used to identify this chain. id: ChainId, + /// SyncingChain type + pub chain_type: SyncingChainType, + /// The start of the chain segment. Any epoch previous to this one has been validated. pub start_epoch: Epoch, @@ -126,6 +139,7 @@ impl SyncingChain { target_head_slot: Slot, target_head_root: Hash256, peer_id: PeerId, + chain_type: SyncingChainType, log: &slog::Logger, ) -> Self { let mut peers = FnvHashMap::default(); @@ -135,6 +149,7 @@ impl SyncingChain { SyncingChain { id, + chain_type, start_epoch, target_head_slot, target_head_root, @@ -171,6 +186,14 @@ impl SyncingChain { self.validated_batches * EPOCHS_PER_BATCH } + /// Returns the total count of pending blocks in all the batches of this chain + pub fn pending_blocks(&self) -> usize { + self.batches + .values() + .map(|batch| batch.pending_blocks()) + .sum() + } + /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. pub fn remove_peer( @@ -305,7 +328,12 @@ impl SyncingChain { // result callback. This is done, because an empty batch could end a chain and the logic // for removing chains and checking completion is in the callback. - let blocks = batch.start_processing()?; + let (blocks, duration_in_awaiting_processing) = batch.start_processing()?; + metrics::observe_duration( + &metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING, + duration_in_awaiting_processing, + ); + let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id); self.current_processing_batch = Some(batch_id); @@ -469,10 +497,27 @@ impl SyncingChain { // We consider three cases. Batch was successfully processed, Batch failed processing due // to a faulty peer, or batch failed processing but the peer can't be deemed faulty. match result { - BatchProcessResult::Success { was_non_empty } => { + BatchProcessResult::Success { + sent_blocks, + imported_blocks, + } => { + if sent_blocks > imported_blocks { + let ignored_blocks = sent_blocks - imported_blocks; + metrics::inc_counter_vec_by( + &metrics::SYNCING_CHAINS_IGNORED_BLOCKS, + &[self.chain_type.into()], + ignored_blocks as u64, + ); + } + metrics::inc_counter_vec( + &metrics::SYNCING_CHAINS_PROCESSED_BATCHES, + &[self.chain_type.into()], + ); + batch.processing_completed(BatchProcessingResult::Success)?; - if *was_non_empty { + // was not empty = sent_blocks > 0 + if *sent_blocks > 0 { // If the processed batch was not empty, we can validate previous unvalidated // blocks. self.advance_chain(network, batch_id); @@ -515,7 +560,7 @@ impl SyncingChain { match batch.processing_completed(BatchProcessingResult::FaultyFailure)? { BatchOperationOutcome::Continue => { // Chain can continue. Check if it can be moved forward. - if *imported_blocks { + if *imported_blocks > 0 { // At least one block was successfully verified and imported, so we can be sure all // previous batches are valid and we only need to download the current failed // batch. @@ -1142,3 +1187,12 @@ impl RemoveChain { ) } } + +impl From for SyncingChainType { + fn from(value: RangeSyncType) -> Self { + match value { + RangeSyncType::Head => Self::Head, + RangeSyncType::Finalized => Self::Finalized, + } + } +} diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 364514a3582..3621a6605af 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -64,8 +64,8 @@ impl ChainCollection { /// Updates the Syncing state of the collection after a chain is removed. fn on_chain_removed(&mut self, id: &ChainId, was_syncing: bool, sync_type: RangeSyncType) { - let _ = metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()]) - .map(|m| m.dec()); + metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_REMOVED, &[sync_type.as_str()]); + self.update_metrics(); match self.state { RangeSyncState::Finalized(ref syncing_id) => { @@ -493,15 +493,28 @@ impl ChainCollection { target_head_slot, target_head_root, peer, + sync_type.into(), &self.log, ); debug_assert_eq!(new_chain.get_id(), id); debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain); entry.insert(new_chain); - let _ = - metrics::get_int_gauge(&metrics::SYNCING_CHAINS_COUNT, &[sync_type.as_str()]) - .map(|m| m.inc()); + metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_ADDED, &[sync_type.as_str()]); + self.update_metrics(); } } } + + fn update_metrics(&self) { + metrics::set_gauge_vec( + &metrics::SYNCING_CHAINS_COUNT, + &[RangeSyncType::Finalized.as_str()], + self.finalized_chains.len() as i64, + ); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAINS_COUNT, + &[RangeSyncType::Head.as_str()], + self.head_chains.len() as i64, + ); + } } diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index fa06af24956..4213771d483 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -43,6 +43,7 @@ use super::block_storage::BlockStorage; use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; +use crate::metrics; use crate::status::ToStatusMessage; use crate::sync::network_context::SyncNetworkContext; use crate::sync::BatchProcessResult; @@ -346,6 +347,12 @@ where } } + metrics::inc_counter_vec_by( + &metrics::SYNCING_CHAINS_DROPPED_BLOCKS, + &[sync_type.as_str()], + chain.pending_blocks() as u64, + ); + network.status_peers(self.beacon_chain.as_ref(), chain.peers()); let status = self.beacon_chain.status_message();