Skip to content

Commit

Permalink
Add range sync metrics to track efficiency (#6095)
Browse files Browse the repository at this point in the history
* Add more range sync metrics to track efficiency

* Add ignored blocks metrics
  • Loading branch information
dapplion authored Jul 15, 2024
1 parent 9f40d91 commit 0e59939
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 28 deletions.
30 changes: 30 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,36 @@ lazy_static! {
"Number of Syncing chains in range, per range type",
&["range_type"]
);
pub static ref SYNCING_CHAINS_REMOVED: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_range_removed_chains_total",
"Total count of range syncing chains removed per range type",
&["range_type"]
);
pub static ref SYNCING_CHAINS_ADDED: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_range_added_chains_total",
"Total count of range syncing chains added per range type",
&["range_type"]
);
pub static ref SYNCING_CHAINS_DROPPED_BLOCKS: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_range_chains_dropped_blocks_total",
"Total count of dropped blocks when removing a syncing chain per range type",
&["range_type"]
);
pub static ref SYNCING_CHAINS_IGNORED_BLOCKS: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_range_chains_ignored_blocks_total",
"Total count of ignored blocks when processing a syncing chain batch per chain type",
&["chain_type"]
);
pub static ref SYNCING_CHAINS_PROCESSED_BATCHES: Result<IntCounterVec> = try_create_int_counter_vec(
"sync_range_chains_processed_batches_total",
"Total count of processed batches in a syncing chain batch per chain type",
&["chain_type"]
);
pub static ref SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: Result<Histogram> = try_create_histogram_with_buckets(
"sync_range_chain_batch_awaiting_processing_seconds",
"Time range sync batches spend in AwaitingProcessing state",
Ok(vec![0.01,0.02,0.05,0.1,0.2,0.5,1.0,2.0,5.0,10.0,20.0])
);
pub static ref SYNC_SINGLE_BLOCK_LOOKUPS: Result<IntGauge> = try_create_int_gauge(
"sync_single_block_lookups",
"Number of single block lookups underway"
Expand Down
14 changes: 8 additions & 6 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.process_blocks(downloaded_blocks.iter(), notify_execution_layer)
.await
{
(_, Ok(_)) => {
(imported_blocks, Ok(_)) => {
debug!(self.log, "Batch processed";
"batch_epoch" => epoch,
"first_block_slot" => start_slot,
Expand All @@ -335,7 +335,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"processed_blocks" => sent_blocks,
"service"=> "sync");
BatchProcessResult::Success {
was_non_empty: sent_blocks > 0,
sent_blocks,
imported_blocks,
}
}
(imported_blocks, Err(e)) => {
Expand All @@ -349,7 +350,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"service" => "sync");
match e.peer_action {
Some(penalty) => BatchProcessResult::FaultyFailure {
imported_blocks: imported_blocks > 0,
imported_blocks,
penalty,
},
None => BatchProcessResult::NonFaultyFailure,
Expand All @@ -368,7 +369,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.sum::<usize>();

match self.process_backfill_blocks(downloaded_blocks) {
(_, Ok(_)) => {
(imported_blocks, Ok(_)) => {
debug!(self.log, "Backfill batch processed";
"batch_epoch" => epoch,
"first_block_slot" => start_slot,
Expand All @@ -377,7 +378,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"processed_blobs" => n_blobs,
"service"=> "sync");
BatchProcessResult::Success {
was_non_empty: sent_blocks > 0,
sent_blocks,
imported_blocks,
}
}
(_, Err(e)) => {
Expand All @@ -390,7 +392,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"service" => "sync");
match e.peer_action {
Some(penalty) => BatchProcessResult::FaultyFailure {
imported_blocks: false,
imported_blocks: 0,
penalty,
},
None => BatchProcessResult::NonFaultyFailure,
Expand Down
10 changes: 6 additions & 4 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// result callback. This is done, because an empty batch could end a chain and the logic
// for removing chains and checking completion is in the callback.

let blocks = match batch.start_processing() {
let (blocks, _) = match batch.start_processing() {
Err(e) => {
return self
.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
Expand Down Expand Up @@ -615,13 +615,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));

match result {
BatchProcessResult::Success { was_non_empty } => {
BatchProcessResult::Success {
imported_blocks, ..
} => {
if let Err(e) = batch.processing_completed(BatchProcessingResult::Success) {
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
}
// If the processed batch was not empty, we can validate previous unvalidated
// blocks.
if *was_non_empty {
if *imported_blocks > 0 {
self.advance_chain(network, batch_id);
}

Expand Down Expand Up @@ -677,7 +679,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {

Ok(BatchOperationOutcome::Continue) => {
// chain can continue. Check if it can be progressed
if *imported_blocks {
if *imported_blocks > 0 {
// At least one block was successfully verified and imported, then we can be sure all
// previous batches are valid and we only need to download the current failed
// batch.
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,12 @@ pub enum BlockProcessingResult<E: EthSpec> {
pub enum BatchProcessResult {
/// The batch was completed successfully. It carries whether the sent batch contained blocks.
Success {
was_non_empty: bool,
sent_blocks: usize,
imported_blocks: usize,
},
/// The batch processing failed. It carries whether the processing imported any block.
FaultyFailure {
imported_blocks: bool,
imported_blocks: usize,
penalty: PeerAction,
},
NonFaultyFailure,
Expand Down
28 changes: 21 additions & 7 deletions beacon_node/network/src/sync/range_sync/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use lighthouse_network::PeerId;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::ops::Sub;
use std::time::{Duration, Instant};
use strum::Display;
use types::{Epoch, EthSpec, Slot};

Expand Down Expand Up @@ -118,7 +119,7 @@ pub enum BatchState<E: EthSpec> {
/// The batch is being downloaded.
Downloading(PeerId, Id),
/// The batch has been completely downloaded and is ready for processing.
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>),
AwaitingProcessing(PeerId, Vec<RpcBlock<E>>, Instant),
/// The batch is being processed.
Processing(Attempt),
/// The batch was successfully processed and is waiting to be validated.
Expand Down Expand Up @@ -210,13 +211,26 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
match &self.state {
BatchState::AwaitingDownload | BatchState::Failed => None,
BatchState::Downloading(peer_id, _)
| BatchState::AwaitingProcessing(peer_id, _)
| BatchState::AwaitingProcessing(peer_id, _, _)
| BatchState::Processing(Attempt { peer_id, .. })
| BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id),
BatchState::Poisoned => unreachable!("Poisoned batch"),
}
}

/// Returns the count of stored pending blocks if in awaiting processing state
pub fn pending_blocks(&self) -> usize {
match &self.state {
BatchState::AwaitingProcessing(_, blocks, _) => blocks.len(),
BatchState::AwaitingDownload
| BatchState::Downloading { .. }
| BatchState::Processing { .. }
| BatchState::AwaitingValidation { .. }
| BatchState::Poisoned
| BatchState::Failed => 0,
}
}

/// Returns a BlocksByRange request associated with the batch.
pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ByRangeRequestType) {
(
Expand Down Expand Up @@ -293,7 +307,7 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}

let received = blocks.len();
self.state = BatchState::AwaitingProcessing(peer, blocks);
self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now());
Ok(received)
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
Expand Down Expand Up @@ -365,11 +379,11 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}
}

pub fn start_processing(&mut self) -> Result<Vec<RpcBlock<E>>, WrongState> {
pub fn start_processing(&mut self) -> Result<(Vec<RpcBlock<E>>, Duration), WrongState> {
match self.state.poison() {
BatchState::AwaitingProcessing(peer, blocks) => {
BatchState::AwaitingProcessing(peer, blocks, start_instant) => {
self.state = BatchState::Processing(Attempt::new::<B, E>(peer, &blocks));
Ok(blocks)
Ok((blocks, start_instant.elapsed()))
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
other => {
Expand Down Expand Up @@ -515,7 +529,7 @@ impl<E: EthSpec> std::fmt::Debug for BatchState<E> {
}) => write!(f, "AwaitingValidation({})", peer_id),
BatchState::AwaitingDownload => f.write_str("AwaitingDownload"),
BatchState::Failed => f.write_str("Failed"),
BatchState::AwaitingProcessing(ref peer, ref blocks) => {
BatchState::AwaitingProcessing(ref peer, ref blocks, _) => {
write!(f, "AwaitingProcessing({}, {} blocks)", peer, blocks.len())
}
BatchState::Downloading(peer, request_id) => {
Expand Down
62 changes: 58 additions & 4 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use super::RangeSyncType;
use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::network_context::RangeRequestId;
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
Expand All @@ -11,6 +13,7 @@ use rand::{seq::SliceRandom, Rng};
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher};
use strum::IntoStaticStr;
use types::{Epoch, EthSpec, Hash256, Slot};

/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
Expand Down Expand Up @@ -53,13 +56,23 @@ pub struct KeepChain;
pub type ChainId = u64;
pub type BatchId = Epoch;

#[derive(Debug, Copy, Clone, IntoStaticStr)]
pub enum SyncingChainType {
Head,
Finalized,
Backfill,
}

/// A chain of blocks that need to be downloaded. Peers who claim to contain the target head
/// root are grouped into the peer pool and queried for batches when downloading the
/// chain.
pub struct SyncingChain<T: BeaconChainTypes> {
/// A random id used to identify this chain.
id: ChainId,

/// SyncingChain type
pub chain_type: SyncingChainType,

/// The start of the chain segment. Any epoch previous to this one has been validated.
pub start_epoch: Epoch,

Expand Down Expand Up @@ -126,6 +139,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
target_head_slot: Slot,
target_head_root: Hash256,
peer_id: PeerId,
chain_type: SyncingChainType,
log: &slog::Logger,
) -> Self {
let mut peers = FnvHashMap::default();
Expand All @@ -135,6 +149,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {

SyncingChain {
id,
chain_type,
start_epoch,
target_head_slot,
target_head_root,
Expand Down Expand Up @@ -171,6 +186,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.validated_batches * EPOCHS_PER_BATCH
}

/// Returns the total count of pending blocks in all the batches of this chain
pub fn pending_blocks(&self) -> usize {
self.batches
.values()
.map(|batch| batch.pending_blocks())
.sum()
}

/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(
Expand Down Expand Up @@ -305,7 +328,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// result callback. This is done, because an empty batch could end a chain and the logic
// for removing chains and checking completion is in the callback.

let blocks = batch.start_processing()?;
let (blocks, duration_in_awaiting_processing) = batch.start_processing()?;
metrics::observe_duration(
&metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING,
duration_in_awaiting_processing,
);

let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id);
self.current_processing_batch = Some(batch_id);

Expand Down Expand Up @@ -469,10 +497,27 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// We consider three cases. Batch was successfully processed, Batch failed processing due
// to a faulty peer, or batch failed processing but the peer can't be deemed faulty.
match result {
BatchProcessResult::Success { was_non_empty } => {
BatchProcessResult::Success {
sent_blocks,
imported_blocks,
} => {
if sent_blocks > imported_blocks {
let ignored_blocks = sent_blocks - imported_blocks;
metrics::inc_counter_vec_by(
&metrics::SYNCING_CHAINS_IGNORED_BLOCKS,
&[self.chain_type.into()],
ignored_blocks as u64,
);
}
metrics::inc_counter_vec(
&metrics::SYNCING_CHAINS_PROCESSED_BATCHES,
&[self.chain_type.into()],
);

batch.processing_completed(BatchProcessingResult::Success)?;

if *was_non_empty {
// was not empty = sent_blocks > 0
if *sent_blocks > 0 {
// If the processed batch was not empty, we can validate previous unvalidated
// blocks.
self.advance_chain(network, batch_id);
Expand Down Expand Up @@ -515,7 +560,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
match batch.processing_completed(BatchProcessingResult::FaultyFailure)? {
BatchOperationOutcome::Continue => {
// Chain can continue. Check if it can be moved forward.
if *imported_blocks {
if *imported_blocks > 0 {
// At least one block was successfully verified and imported, so we can be sure all
// previous batches are valid and we only need to download the current failed
// batch.
Expand Down Expand Up @@ -1142,3 +1187,12 @@ impl RemoveChain {
)
}
}

impl From<RangeSyncType> for SyncingChainType {
fn from(value: RangeSyncType) -> Self {
match value {
RangeSyncType::Head => Self::Head,
RangeSyncType::Finalized => Self::Finalized,
}
}
}
Loading

0 comments on commit 0e59939

Please sign in to comment.