Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track beacon processor import result metrics #6541

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ use std::fs;
use std::io::Write;
use std::sync::Arc;
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use strum::AsRefStr;
use task_executor::JoinHandle;
use types::{
data_column_sidecar::DataColumnSidecarError, BeaconBlockRef, BeaconState, BeaconStateError,
Expand Down Expand Up @@ -137,7 +138,7 @@ const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files");
///
/// - The block is malformed/invalid (indicated by all results other than `BeaconChainError`.
/// - We encountered an error whilst trying to verify the block (a `BeaconChainError`).
#[derive(Debug)]
#[derive(Debug, AsRefStr)]
pub enum BlockError {
/// The parent block was unknown.
///
Expand Down
62 changes: 61 additions & 1 deletion beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use beacon_chain::{
attestation_verification::Error as AttnError,
light_client_finality_update_verification::Error as LightClientFinalityUpdateError,
light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError,
sync_committee_verification::Error as SyncCommitteeError,
sync_committee_verification::Error as SyncCommitteeError, AvailabilityProcessingStatus,
BlockError,
};
use fnv::FnvHashMap;
pub use lighthouse_metrics::*;
Expand All @@ -11,12 +12,19 @@ use lighthouse_network::{
NetworkGlobals,
};
use std::sync::{Arc, LazyLock};
use strum::AsRefStr;
use strum::IntoEnumIterator;
use types::EthSpec;

pub const SUCCESS: &str = "SUCCESS";
pub const FAILURE: &str = "FAILURE";

#[derive(Debug, AsRefStr)]
pub(crate) enum BlockSource {
Gossip,
Rpc,
}

pub static BEACON_BLOCK_MESH_PEERS_PER_CLIENT: LazyLock<Result<IntGaugeVec>> =
LazyLock::new(|| {
try_create_int_gauge_vec(
Expand Down Expand Up @@ -59,6 +67,27 @@ pub static SYNC_COMMITTEE_SUBSCRIPTION_REQUESTS: LazyLock<Result<IntCounter>> =
)
});

/*
* Beacon processor
*/
pub static BEACON_PROCESSOR_MISSING_COMPONENTS: LazyLock<Result<IntCounterVec>> = LazyLock::new(
|| {
try_create_int_counter_vec(
"beacon_processor_missing_components_total",
"Total number of imported individual block components that resulted in missing components",
&["source", "component"],
)
},
);
pub static BEACON_PROCESSOR_IMPORT_ERRORS_PER_TYPE: LazyLock<Result<IntCounterVec>> =
LazyLock::new(|| {
try_create_int_counter_vec(
"beacon_processor_import_errors_total",
"Total number of block components verified",
&["source", "component", "type"],
)
});

/*
* Gossip processor
*/
Expand Down Expand Up @@ -606,6 +635,37 @@ pub fn register_sync_committee_error(error: &SyncCommitteeError) {
inc_counter_vec(&GOSSIP_SYNC_COMMITTEE_ERRORS_PER_TYPE, &[error.as_ref()]);
}

pub(crate) fn register_process_result_metrics(
result: &std::result::Result<AvailabilityProcessingStatus, BlockError>,
source: BlockSource,
block_component: &'static str,
) {
match result {
Ok(status) => match status {
AvailabilityProcessingStatus::Imported { .. } => match source {
BlockSource::Gossip => {
inc_counter(&BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
}
BlockSource::Rpc => {
inc_counter(&BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
}
},
AvailabilityProcessingStatus::MissingComponents { .. } => {
inc_counter_vec(
&BEACON_PROCESSOR_MISSING_COMPONENTS,
&[source.as_ref(), block_component],
);
}
},
Err(error) => {
inc_counter_vec(
&BEACON_PROCESSOR_IMPORT_ERRORS_PER_TYPE,
&[source.as_ref(), block_component, error.as_ref()],
);
}
}
}

pub fn from_result<T, E>(result: &std::result::Result<T, E>) -> &str {
match result {
Ok(_) => SUCCESS,
Expand Down
68 changes: 31 additions & 37 deletions beacon_node/network/src/network_beacon_processor/gossip_methods.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
metrics,
metrics::{self, register_process_result_metrics},
network_beacon_processor::{InvalidBlockStorage, NetworkBeaconProcessor},
service::NetworkMessage,
sync::SyncMessage,
Expand Down Expand Up @@ -918,11 +918,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.chain
.process_gossip_blob(verified_blob, || Ok(()))
.await;
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "blob");

match &result {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
// Note: Reusing block imported metric here
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);
info!(
self.log,
"Gossipsub blob processed, imported fully available block";
Expand Down Expand Up @@ -992,43 +991,39 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let data_column_slot = verified_data_column.slot();
let data_column_index = verified_data_column.id().index;

match self
let result = self
.chain
.process_gossip_data_columns(vec![verified_data_column], || Ok(()))
.await
{
Ok(availability) => {
match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
// Note: Reusing block imported metric here
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL,
);
info!(
self.log,
"Gossipsub data column processed, imported fully available block";
"block_root" => %block_root
);
self.chain.recompute_head_at_current_slot().await;
.await;
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column");

metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
processing_start_time.elapsed().as_millis() as i64,
);
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
trace!(
self.log,
"Processed data column, waiting for other components";
"slot" => %slot,
"data_column_index" => %data_column_index,
"block_root" => %block_root,
);
match result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
info!(
self.log,
"Gossipsub data column processed, imported fully available block";
"block_root" => %block_root
);
self.chain.recompute_head_at_current_slot().await;

self.attempt_data_column_reconstruction(block_root).await;
}
metrics::set_gauge(
&metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION,
processing_start_time.elapsed().as_millis() as i64,
);
}
}
AvailabilityProcessingStatus::MissingComponents(slot, block_root) => {
trace!(
self.log,
"Processed data column, waiting for other components";
"slot" => %slot,
"data_column_index" => %data_column_index,
"block_root" => %block_root,
);

self.attempt_data_column_reconstruction(block_root).await;
}
},
Err(BlockError::DuplicateFullyImported(_)) => {
debug!(
self.log,
Expand Down Expand Up @@ -1456,11 +1451,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
NotifyExecutionLayer::Yes,
)
.await;
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "block");

match &result {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL);

if reprocess_tx
.try_send(ReprocessQueueMessage::BlockImported {
block_root: *block_root,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::metrics;
use crate::metrics::{self, register_process_result_metrics};
use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERANCE};
use crate::sync::BatchProcessResult;
use crate::sync::{
Expand Down Expand Up @@ -162,8 +162,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
NotifyExecutionLayer::Yes,
)
.await;

metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL);
register_process_result_metrics(&result, metrics::BlockSource::Rpc, "block");

// RPC block imported, regardless of process type
if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result {
Expand Down Expand Up @@ -274,6 +273,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}

let result = self.chain.process_rpc_blobs(slot, block_root, blobs).await;
register_process_result_metrics(&result, metrics::BlockSource::Rpc, "blobs");

match &result {
Ok(AvailabilityProcessingStatus::Imported(hash)) => {
Expand Down Expand Up @@ -331,6 +331,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.chain
.process_rpc_custody_columns(custody_columns)
.await;
register_process_result_metrics(&result, metrics::BlockSource::Rpc, "custody_columns");

match &result {
Ok(availability) => match availability {
Expand Down
Loading