diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index d096b33f6cb..309512076a0 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -257,6 +257,10 @@ lazy_static! { "sync_lookups_completed_total", "Total count of sync lookups completed", ); + pub static ref SYNC_LOOKUPS_STUCK: Result = try_create_int_gauge( + "sync_lookups_stuck", + "Current count of sync lookups that may be stuck", + ); /* * Block Delay Metrics diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 1deb50237db..48dda03facd 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -30,6 +30,7 @@ mod tests; const FAILED_CHAINS_CACHE_EXPIRY_SECONDS: u64 = 60; pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4; +const LOOKUP_MAX_DURATION_SECS: u64 = 60; pub enum BlockComponent { Block(DownloadResult>>), @@ -665,4 +666,21 @@ impl BlockLookups { self.single_block_lookups.len() as i64, ); } + + pub fn log_stuck_lookups(&self) { + let mut stuck_count = 0; + for lookup in self.single_block_lookups.values() { + if lookup.elapsed_since_created() > Duration::from_secs(LOOKUP_MAX_DURATION_SECS) { + debug!(self.log, "Lookup maybe stuck"; + // Fields id and block_root are also part of the summary. However, logging them + // here allows log parsers o index them and have better search + "id" => lookup.id, + "block_root" => ?lookup.block_root(), + "summary" => ?lookup, + ); + stuck_count += 1; + } + } + metrics::set_gauge(&metrics::SYNC_LOOKUPS_STUCK, stuck_count); + } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index b6c2825fabc..b35a3e91fb6 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -4,12 +4,13 @@ use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::Id; use crate::sync::network_context::{LookupRequestResult, ReqId, SyncNetworkContext}; use beacon_chain::BeaconChainTypes; +use derivative::Derivative; use itertools::Itertools; use rand::seq::IteratorRandom; use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; use types::blob_sidecar::FixedBlobSidecarList; @@ -53,12 +54,15 @@ pub enum LookupRequestError { }, } +#[derive(Derivative)] +#[derivative(Debug(bound = "T: BeaconChainTypes"))] pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, pub blob_request_state: BlobRequestState, block_root: Hash256, awaiting_parent: Option, + created: Instant, } impl SingleBlockLookup { @@ -74,6 +78,7 @@ impl SingleBlockLookup { blob_request_state: BlobRequestState::new(requested_block_root, peers), block_root: requested_block_root, awaiting_parent, + created: Instant::now(), } } @@ -98,6 +103,11 @@ impl SingleBlockLookup { self.awaiting_parent = None; } + /// Returns the time elapsed since this lookup was created + pub fn elapsed_since_created(&self) -> Duration { + self.created.elapsed() + } + /// Maybe insert a verified response into this lookup. Returns true if imported pub fn add_child_components(&mut self, block_component: BlockComponent) -> bool { match block_component { @@ -244,7 +254,10 @@ impl SingleBlockLookup { } /// The state of the blob request component of a `SingleBlockLookup`. +#[derive(Derivative)] +#[derivative(Debug)] pub struct BlobRequestState { + #[derivative(Debug = "ignore")] pub block_root: Hash256, pub state: SingleLookupRequestState>, } @@ -259,7 +272,10 @@ impl BlobRequestState { } /// The state of the block request component of a `SingleBlockLookup`. +#[derive(Derivative)] +#[derivative(Debug)] pub struct BlockRequestState { + #[derivative(Debug = "ignore")] pub requested_block_root: Hash256, pub state: SingleLookupRequestState>>, } @@ -281,7 +297,7 @@ pub struct DownloadResult { pub peer_id: PeerId, } -#[derive(Debug, PartialEq, Eq, IntoStaticStr)] +#[derive(PartialEq, Eq, IntoStaticStr)] pub enum State { AwaitingDownload, Downloading(ReqId), @@ -293,13 +309,16 @@ pub enum State { } /// Object representing the state of a single block or blob lookup request. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Derivative)] +#[derivative(Debug)] pub struct SingleLookupRequestState { /// State of this request. state: State, /// Peers that should have this block or blob. + #[derivative(Debug(format_with = "fmt_peer_set"))] available_peers: HashSet, /// Peers from which we have requested this block. + #[derivative(Debug = "ignore")] used_peers: HashSet, /// How many times have we attempted to process this block or blob. failed_processing: u8, @@ -529,8 +548,30 @@ impl SingleLookupRequestState { } } +// Display is used in the BadState assertions above impl std::fmt::Display for State { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", Into::<&'static str>::into(self)) } } + +// Debug is used in the log_stuck_lookups print to include some more info. Implements custom Debug +// to not dump an entire block or blob to terminal which don't add valuable data. +impl std::fmt::Debug for State { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::AwaitingDownload { .. } => write!(f, "AwaitingDownload"), + Self::Downloading(req_id) => write!(f, "Downloading({:?})", req_id), + Self::AwaitingProcess(d) => write!(f, "AwaitingProcess({:?})", d.peer_id), + Self::Processing(d) => write!(f, "Processing({:?})", d.peer_id), + Self::Processed { .. } => write!(f, "Processed"), + } + } +} + +fn fmt_peer_set( + peer_set: &HashSet, + f: &mut std::fmt::Formatter, +) -> Result<(), std::fmt::Error> { + write!(f, "{}", peer_set.len()) +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 66d23dd1918..71d3113414c 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -547,6 +547,10 @@ impl SyncManager { futures::stream::iter(ee_responsiveness_watch.await).flatten() }; + // LOOKUP_MAX_DURATION_SECS is 60 seconds. Logging every 30 seconds allows enough timely + // visbility while being sparse and not increasing the debug log volume in a noticeable way + let mut interval = tokio::time::interval(Duration::from_secs(30)); + // process any inbound messages loop { tokio::select! { @@ -556,6 +560,9 @@ impl SyncManager { Some(engine_state) = check_ee_stream.next(), if check_ee => { self.handle_new_execution_engine_state(engine_state); } + _ = interval.tick() => { + self.block_lookups.log_stuck_lookups(); + } } } }