Skip to content

Commit

Permalink
Track beacon processor import result metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Oct 26, 2024
1 parent 8188e03 commit 92e801b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 57 deletions.
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ use std::fs;
use std::io::Write;
use std::sync::Arc;
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use strum::AsRefStr;
use task_executor::JoinHandle;
use types::{
data_column_sidecar::DataColumnSidecarError, BeaconBlockRef, BeaconState, BeaconStateError,
Expand Down Expand Up @@ -137,7 +138,7 @@ const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files");
///
/// - The block is malformed/invalid (indicated by all results other than `BeaconChainError`.
/// - We encountered an error whilst trying to verify the block (a `BeaconChainError`).
#[derive(Debug)]
#[derive(Debug, AsRefStr)]
pub enum BlockError {
/// The parent block was unknown.
///
Expand Down
68 changes: 52 additions & 16 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use beacon_chain::{
attestation_verification::Error as AttnError,
light_client_finality_update_verification::Error as LightClientFinalityUpdateError,
light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError,
sync_committee_verification::Error as SyncCommitteeError,
sync_committee_verification::Error as SyncCommitteeError, AvailabilityProcessingStatus,
BlockError,
};
use fnv::FnvHashMap;
pub use lighthouse_metrics::*;
Expand Down Expand Up @@ -59,6 +60,34 @@ pub static SYNC_COMMITTEE_SUBSCRIPTION_REQUESTS: LazyLock<Result<IntCounter>> =
)
});

/*
* Beacon processor
*/
pub static BEACON_PROCESSOR_FULLY_IMPORTED: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"beacon_processor_fully_imported_total",
"Total number of fully imported block components by block component trigger",
&["component"],
)
});
pub static BEACON_PROCESSOR_MISSING_COMPONENTS: LazyLock<Result<IntCounterVec>> = LazyLock::new(
|| {
try_create_int_counter_vec(
"beacon_processor_missing_components_total",
"Total number of imported individual block components that resulted in missing components",
&["component"],
)
},
);
pub static BEACON_PROCESSOR_IMPORT_ERRORS_PER_TYPE: LazyLock<Result<IntCounterVec>> =
LazyLock::new(|| {
try_create_int_counter_vec(
"beacon_processor_import_errors_total",
"Total number of block components verified",
&["component", "type"],
)
});

/*
* Gossip processor
*/
Expand All @@ -71,13 +100,6 @@ pub static BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL: LazyLock<Result<IntCoun
"Total number of gossip blocks verified for propagation.",
)
});
pub static BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
try_create_int_counter(
"beacon_processor_gossip_block_imported_total",
"Total number of gossip blocks imported to fork choice, etc.",
)
});
pub static BEACON_PROCESSOR_GOSSIP_BLOCK_REQUEUED_TOTAL: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
try_create_int_counter(
Expand Down Expand Up @@ -168,14 +190,6 @@ pub static BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_IMPORTED_TOTAL: LazyLock<Res
)
});

// Rpc blocks.
pub static BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
try_create_int_counter(
"beacon_processor_rpc_block_imported_total",
"Total number of gossip blocks imported to fork choice, etc.",
)
});
// Chain segments.
pub static BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
Expand Down Expand Up @@ -606,6 +620,28 @@ pub fn register_sync_committee_error(error: &SyncCommitteeError) {
inc_counter_vec(&GOSSIP_SYNC_COMMITTEE_ERRORS_PER_TYPE, &[error.as_ref()]);
}

pub fn register_process_result_metrics(
result: &std::result::Result<AvailabilityProcessingStatus, BlockError>,
item: &'static str,
) {
match result {
Ok(status) => match status {
AvailabilityProcessingStatus::Imported { .. } => {
inc_counter_vec(&BEACON_PROCESSOR_FULLY_IMPORTED, &[item]);
}
AvailabilityProcessingStatus::MissingComponents { .. } => {
inc_counter_vec(&BEACON_PROCESSOR_MISSING_COMPONENTS, &[item]);
}
},
Err(error) => {
inc_counter_vec(
&BEACON_PROCESSOR_IMPORT_ERRORS_PER_TYPE,
&[item, error.as_ref()],
);
}
}
}

pub fn from_result<T, E>(result: &std::result::Result<T, E>) -> &str {
match result {
Ok(_) => SUCCESS,
Expand Down
68 changes: 31 additions & 37 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
metrics,
metrics::{self, register_process_result_metrics},
network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor},
service::NetworkMessage,
sync::SyncMessage,
Expand Down Expand Up @@ -918,11 +918,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.chain
.process_gossip_blob(verified_blob, || Ok(()))
.await;
register_process_result_metrics(&result, "gossip_blob");

match &result {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
// Note: Reusing block imported metric here
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
info!(
self.log,
"Gossipsub blob processed, imported fully available block";
Expand Down Expand Up @@ -992,43 +991,39 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let data_column_slot = verified_data_column.slot();
let data_column_index = verified_data_column.id().index;

match self
let result = self
.chain
.process_gossip_data_columns(vec![verified_data_column], || Ok(()))
.await
{
Ok(availability) => {
match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
// Note: Reusing block imported metric here
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL,
);
info!(
self.log,
"Gossipsub data column processed, imported fully available block";
"block_root" => %block_root
);
self.chain.recompute_head_at_current_slot().await;
.await;
register_process_result_metrics(&result, "gossip_data_column");

metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
processing_start_time.elapsed().as_millis() as i64,
);
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
trace!(
self.log,
"Processed data column, waiting for other components";
"slot" => %slot,
"data_column_index" => %data_column_index,
"block_root" => %block_root,
);
match result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
info!(
self.log,
"Gossipsub data column processed, imported fully available block";
"block_root" => %block_root
);
self.chain.recompute_head_at_current_slot().await;

self.attempt_data_column_reconstruction(block_root).await;
}
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
processing_start_time.elapsed().as_millis() as i64,
);
}
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
trace!(
self.log,
"Processed data column, waiting for other components";
"slot" => %slot,
"data_column_index" => %data_column_index,
"block_root" => %block_root,
);

self.attempt_data_column_reconstruction(block_root).await;
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
self.log,
Expand Down Expand Up @@ -1456,11 +1451,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
NotifyExecutionLayer::Yes,
)
.await;
register_process_result_metrics(&result, "gossip_block");

match &result {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);

if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported {
block_root: *block_root,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::metrics;
use crate::metrics::{self, register_process_result_metrics};
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
use crate::sync::BatchProcessResult;
use crate::sync::{
Expand Down Expand Up @@ -162,8 +162,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
NotifyExecutionLayer::Yes,
)
.await;

metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
register_process_result_metrics(&result, "rpc_block");

// RPC block imported, regardless of process type
if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result {
Expand Down Expand Up @@ -274,6 +273,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}

let result = self.chain.process_rpc_blobs(slot, block_root, blobs).await;
register_process_result_metrics(&result, "rpc_blobs");

match &result {
Ok(AvailabilityProcessingStatus::Imported(hash)) => {
Expand Down Expand Up @@ -331,6 +331,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.chain
.process_rpc_custody_columns(custody_columns)
.await;
register_process_result_metrics(&result, "rpc_custody_columns");

match &result {
Ok(availability) => match availability {
Expand Down

0 comments on commit 92e801b

Please sign in to comment.