Skip to content

Commit

Permalink
Log stuck lookups (sigp#5778)
Browse files Browse the repository at this point in the history
* Log stuck lookups every interval

* Implement debug manually

* Add comment

* Do not print peers twice

* Add SYNC_LOOKUPS_STUCK metric

* Skip logging request root

* use derivative

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into log-stuck-lookups

* add req id to debug

* Merge remote-tracking branch 'sigp/unstable' into log-stuck-lookups

* Fix conflict with unstable
  • Loading branch information
dapplion authored May 14, 2024
1 parent 683d9df commit 6f45ad4
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 3 deletions.
4 changes: 4 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ lazy_static! {
"sync_lookups_completed_total",
"Total count of sync lookups completed",
);
pub static ref SYNC_LOOKUPS_STUCK: Result<IntGauge> = try_create_int_gauge(
"sync_lookups_stuck",
"Current count of sync lookups that may be stuck",
);

/*
* Block Delay Metrics
Expand Down
18 changes: 18 additions & 0 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod tests;

const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60;
pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4;
const LOOKUP_MAX_DURATION_SECS: u64 = 60;

pub enum BlockComponent<E: EthSpec> {
Block(DownloadResult<Arc<SignedBeaconBlock<E>>>),
Expand Down Expand Up @@ -665,4 +666,21 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.single_block_lookups.len() as i64,
);
}

pub fn log_stuck_lookups(&self) {
let mut stuck_count = 0;
for lookup in self.single_block_lookups.values() {
if lookup.elapsed_since_created() > Duration::from_secs(LOOKUP_MAX_DURATION_SECS) {
debug!(self.log, "Lookup maybe stuck";
// Fields id and block_root are also part of the summary. However, logging them
// here allows log parsers o index them and have better search
"id" => lookup.id,
"block_root" => ?lookup.block_root(),
"summary" => ?lookup,
);
stuck_count += 1;
}
}
metrics::set_gauge(&metrics::SYNC_LOOKUPS_STUCK, stuck_count);
}
}
47 changes: 44 additions & 3 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use crate::sync::block_lookups::common::RequestState;
use crate::sync::block_lookups::Id;
use crate::sync::network_context::{LookupRequestResult, ReqId, SyncNetworkContext};
use beacon_chain::BeaconChainTypes;
use derivative::Derivative;
use itertools::Itertools;
use rand::seq::IteratorRandom;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use store::Hash256;
use strum::IntoStaticStr;
use types::blob_sidecar::FixedBlobSidecarList;
Expand Down Expand Up @@ -53,12 +54,15 @@ pub enum LookupRequestError {
},
}

#[derive(Derivative)]
#[derivative(Debug(bound = "T: BeaconChainTypes"))]
pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub blob_request_state: BlobRequestState<T::EthSpec>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
created: Instant,
}

impl<T: BeaconChainTypes> SingleBlockLookup<T> {
Expand All @@ -74,6 +78,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
blob_request_state: BlobRequestState::new(requested_block_root, peers),
block_root: requested_block_root,
awaiting_parent,
created: Instant::now(),
}
}

Expand All @@ -98,6 +103,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.awaiting_parent = None;
}

/// Returns the time elapsed since this lookup was created
pub fn elapsed_since_created(&self) -> Duration {
self.created.elapsed()
}

/// Maybe insert a verified response into this lookup. Returns true if imported
pub fn add_child_components(&mut self, block_component: BlockComponent<T::EthSpec>) -> bool {
match block_component {
Expand Down Expand Up @@ -244,7 +254,10 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}

/// The state of the blob request component of a `SingleBlockLookup`.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct BlobRequestState<E: EthSpec> {
#[derivative(Debug = "ignore")]
pub block_root: Hash256,
pub state: SingleLookupRequestState<FixedBlobSidecarList<E>>,
}
Expand All @@ -259,7 +272,10 @@ impl<E: EthSpec> BlobRequestState<E> {
}

/// The state of the block request component of a `SingleBlockLookup`.
#[derive(Derivative)]
#[derivative(Debug)]
pub struct BlockRequestState<E: EthSpec> {
#[derivative(Debug = "ignore")]
pub requested_block_root: Hash256,
pub state: SingleLookupRequestState<Arc<SignedBeaconBlock<E>>>,
}
Expand All @@ -281,7 +297,7 @@ pub struct DownloadResult<T: Clone> {
pub peer_id: PeerId,
}

#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
#[derive(PartialEq, Eq, IntoStaticStr)]
pub enum State<T: Clone> {
AwaitingDownload,
Downloading(ReqId),
Expand All @@ -293,13 +309,16 @@ pub enum State<T: Clone> {
}

/// Object representing the state of a single block or blob lookup request.
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Derivative)]
#[derivative(Debug)]
pub struct SingleLookupRequestState<T: Clone> {
/// State of this request.
state: State<T>,
/// Peers that should have this block or blob.
#[derivative(Debug(format_with = "fmt_peer_set"))]
available_peers: HashSet<PeerId>,
/// Peers from which we have requested this block.
#[derivative(Debug = "ignore")]
used_peers: HashSet<PeerId>,
/// How many times have we attempted to process this block or blob.
failed_processing: u8,
Expand Down Expand Up @@ -529,8 +548,30 @@ impl<T: Clone> SingleLookupRequestState<T> {
}
}

// Display is used in the BadState assertions above
impl<T: Clone> std::fmt::Display for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", Into::<&'static str>::into(self))
}
}

// Debug is used in the log_stuck_lookups print to include some more info. Implements custom Debug
// to not dump an entire block or blob to terminal which don't add valuable data.
impl<T: Clone> std::fmt::Debug for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AwaitingDownload { .. } => write!(f, "AwaitingDownload"),
Self::Downloading(req_id) => write!(f, "Downloading({:?})", req_id),
Self::AwaitingProcess(d) => write!(f, "AwaitingProcess({:?})", d.peer_id),
Self::Processing(d) => write!(f, "Processing({:?})", d.peer_id),
Self::Processed { .. } => write!(f, "Processed"),
}
}
}

fn fmt_peer_set(
peer_set: &HashSet<PeerId>,
f: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> {
write!(f, "{}", peer_set.len())
}
7 changes: 7 additions & 0 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
futures::stream::iter(ee_responsiveness_watch.await).flatten()
};

// LOOKUP_MAX_DURATION_SECS is 60 seconds. Logging every 30 seconds allows enough timely
// visbility while being sparse and not increasing the debug log volume in a noticeable way
let mut interval = tokio::time::interval(Duration::from_secs(30));

// process any inbound messages
loop {
tokio::select! {
Expand All @@ -556,6 +560,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
Some(engine_state) = check_ee_stream.next(), if check_ee => {
self.handle_new_execution_engine_state(engine_state);
}
_ = interval.tick() => {
self.block_lookups.log_stuck_lookups();
}
}
}
}
Expand Down

0 comments on commit 6f45ad4

Please sign in to comment.