diff --git a/rust/main/Cargo.lock b/rust/main/Cargo.lock index 27b11dcab1..b6f02ecfee 100644 --- a/rust/main/Cargo.lock +++ b/rust/main/Cargo.lock @@ -4417,7 +4417,6 @@ dependencies = [ "console-subscriber", "convert_case 0.6.0", "derive-new", - "derive_builder", "ed25519-dalek 1.0.1", "ethers", "ethers-prometheus", diff --git a/rust/main/agents/relayer/src/relayer.rs b/rust/main/agents/relayer/src/relayer.rs index 975f372426..1ac619792c 100644 --- a/rust/main/agents/relayer/src/relayer.rs +++ b/rust/main/agents/relayer/src/relayer.rs @@ -12,11 +12,12 @@ use hyperlane_base::{ broadcast::BroadcastMpscSender, db::{HyperlaneRocksDB, DB}, metrics::{AgentMetrics, MetricsUpdater}, - settings::ChainConf, + settings::{ChainConf, IndexSettings}, AgentMetadata, BaseAgent, ChainMetrics, ContractSyncMetrics, ContractSyncer, CoreMetrics, HyperlaneAgentCore, SyncOptions, }; use hyperlane_core::{ + rpc_clients::call_and_retry_n_times, ChainCommunicationError, ContractSyncCursor, HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, QueueOperation, H512, U256, }; @@ -50,6 +51,9 @@ use crate::{ }; use crate::{processor::Processor, server::ENDPOINT_MESSAGES_QUEUE_SIZE}; +const CURSOR_BUILDING_ERROR: &str = "Error building cursor for origin"; +const CURSOR_INSTANTIATION_ATTEMPTS: usize = 10; + #[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)] struct ContextKey { origin: u32, @@ -354,6 +358,7 @@ impl BaseAgent for Relayer { } for origin in &self.origin_chains { + self.chain_metrics.set_critical_error(origin.name(), false); let maybe_broadcaster = self .message_syncs .get(origin) @@ -412,6 +417,34 @@ impl BaseAgent for Relayer { } impl Relayer { + fn record_critical_error( + &self, + origin: &HyperlaneDomain, + err: ChainCommunicationError, + message: &str, + ) { + error!(?err, origin=?origin, "{message}"); + self.chain_metrics.set_critical_error(origin.name(), true); + } + + async fn instantiate_cursor_with_retries( + contract_sync: Arc>, + index_settings: IndexSettings, + ) -> Result>, ChainCommunicationError> { + call_and_retry_n_times( + || { + let contract_sync = contract_sync.clone(); + let index_settings = index_settings.clone(); + Box::pin(async move { + let cursor = contract_sync.cursor(index_settings).await?; + Ok(cursor) + }) + }, + CURSOR_INSTANTIATION_ATTEMPTS, + ) + .await + } + async fn run_message_sync( &self, origin: &HyperlaneDomain, @@ -419,10 +452,16 @@ impl Relayer { ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index_settings(); let contract_sync = self.message_syncs.get(origin).unwrap().clone(); - let cursor = contract_sync - .cursor(index_settings) - .await - .unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}")); + let cursor_instantiation_result = + Self::instantiate_cursor_with_retries(contract_sync.clone(), index_settings.clone()) + .await; + let cursor = match cursor_instantiation_result { + Ok(cursor) => cursor, + Err(err) => { + self.record_critical_error(origin, err, CURSOR_BUILDING_ERROR); + return tokio::spawn(async {}).instrument(info_span!("MessageSync")); + } + }; tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() @@ -444,10 +483,16 @@ impl Relayer { .get(origin) .unwrap() .clone(); - let cursor = contract_sync - .cursor(index_settings) - .await - .unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}")); + let cursor_instantiation_result = + Self::instantiate_cursor_with_retries(contract_sync.clone(), index_settings.clone()) + .await; + let cursor = match cursor_instantiation_result { + Ok(cursor) => cursor, + Err(err) => { + self.record_critical_error(origin, err, CURSOR_BUILDING_ERROR); + return tokio::spawn(async {}).instrument(info_span!("IgpSync")); + } + }; tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() @@ -468,10 +513,16 @@ impl Relayer { ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index.clone(); let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone(); - let cursor = contract_sync - .cursor(index_settings) - .await - .unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}")); + let cursor_instantiation_result = + Self::instantiate_cursor_with_retries(contract_sync.clone(), index_settings.clone()) + .await; + let cursor = match cursor_instantiation_result { + Ok(cursor) => cursor, + Err(err) => { + self.record_critical_error(origin, err, CURSOR_BUILDING_ERROR); + return tokio::spawn(async {}).instrument(info_span!("MerkleTreeHookSync")); + } + }; tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() diff --git a/rust/main/hyperlane-base/Cargo.toml b/rust/main/hyperlane-base/Cargo.toml index 5bfe50c6e0..ef25d99405 100644 --- a/rust/main/hyperlane-base/Cargo.toml +++ b/rust/main/hyperlane-base/Cargo.toml @@ -15,7 +15,6 @@ color-eyre = { workspace = true, optional = true } config.workspace = true console-subscriber.workspace = true convert_case.workspace = true -derive_builder.workspace = true derive-new.workspace = true ed25519-dalek.workspace = true ethers.workspace = true diff --git a/rust/main/hyperlane-base/src/agent.rs b/rust/main/hyperlane-base/src/agent.rs index 0ca9f45547..5ee00c1eb9 100644 --- a/rust/main/hyperlane-base/src/agent.rs +++ b/rust/main/hyperlane-base/src/agent.rs @@ -8,8 +8,7 @@ use hyperlane_core::config::*; use tracing::info; use crate::{ - create_chain_metrics, - metrics::{create_agent_metrics, AgentMetrics, CoreMetrics}, + metrics::{AgentMetrics, CoreMetrics}, settings::Settings, ChainMetrics, }; @@ -88,8 +87,8 @@ pub async fn agent_main() -> Result<()> { let metrics = settings.as_ref().metrics(A::AGENT_NAME)?; let tokio_server = core_settings.tracing.start_tracing(&metrics)?; - let agent_metrics = create_agent_metrics(&metrics)?; - let chain_metrics = create_chain_metrics(&metrics)?; + let agent_metrics = AgentMetrics::new(&metrics)?; + let chain_metrics = ChainMetrics::new(&metrics)?; let agent = A::from_settings( agent_metadata, settings, diff --git a/rust/main/hyperlane-base/src/metrics/agent_metrics.rs b/rust/main/hyperlane-base/src/metrics/agent_metrics.rs index 4ece5b0e37..16fd8fb9f9 100644 --- a/rust/main/hyperlane-base/src/metrics/agent_metrics.rs +++ b/rust/main/hyperlane-base/src/metrics/agent_metrics.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Duration; -use derive_builder::Builder; use eyre::Result; use hyperlane_core::metrics::agent::decimals_by_protocol; use hyperlane_core::metrics::agent::u256_as_scaled_f64; @@ -45,8 +44,14 @@ pub const GAS_PRICE_LABELS: &[&str] = &["chain"]; pub const GAS_PRICE_HELP: &str = "Tracks the current gas price of the chain, in the lowest denomination (e.g. wei)"; +/// Expected label names for the `critical_error` metric. +pub const CRITICAL_ERROR_LABELS: &[&str] = &["chain"]; +/// Help string for the metric. +pub const CRITICAL_ERROR_HELP: &str = + "Boolean marker for critical errors on a chain, signalling loss of liveness"; + /// Agent-specific metrics -#[derive(Clone, Builder, Debug)] +#[derive(Clone, Debug)] pub struct AgentMetrics { /// Current balance of native tokens for the /// wallet address. @@ -57,27 +62,28 @@ pub struct AgentMetrics { /// - `token_address`: Address of the token. /// - `token_symbol`: Symbol of the token. /// - `token_name`: Full name of the token. - #[builder(setter(into, strip_option), default)] wallet_balance: Option, } -pub(crate) fn create_agent_metrics(metrics: &CoreMetrics) -> Result { - Ok(AgentMetricsBuilder::default() - .wallet_balance(metrics.new_gauge( - "wallet_balance", - WALLET_BALANCE_HELP, - WALLET_BALANCE_LABELS, - )?) - .build()?) +impl AgentMetrics { + pub(crate) fn new(metrics: &CoreMetrics) -> Result { + let agent_metrics = AgentMetrics { + wallet_balance: Some(metrics.new_gauge( + "wallet_balance", + WALLET_BALANCE_HELP, + WALLET_BALANCE_LABELS, + )?), + }; + Ok(agent_metrics) + } } /// Chain-specific metrics -#[derive(Clone, Builder, Debug)] +#[derive(Clone, Debug)] pub struct ChainMetrics { /// Tracks the current block height of the chain. /// - `chain`: the chain name (or ID if the name is unknown) of the chain /// the block number refers to. - #[builder(setter(into))] pub block_height: IntGaugeVec, /// Tracks the current gas price of the chain. Uses the base_fee_per_gas if @@ -85,19 +91,45 @@ pub struct ChainMetrics { /// TODO: use the median of the transactions. /// - `chain`: the chain name (or chain ID if the name is unknown) of the /// chain the gas price refers to. - #[builder(setter(into, strip_option), default)] pub gas_price: Option, + + /// Boolean marker for critical errors on a chain, signalling loss of liveness. + critical_error: IntGaugeVec, } -pub(crate) fn create_chain_metrics(metrics: &CoreMetrics) -> Result { - Ok(ChainMetricsBuilder::default() - .block_height(metrics.new_int_gauge( - "block_height", - BLOCK_HEIGHT_HELP, - BLOCK_HEIGHT_LABELS, - )?) - .gas_price(metrics.new_gauge("gas_price", GAS_PRICE_HELP, GAS_PRICE_LABELS)?) - .build()?) +impl ChainMetrics { + pub(crate) fn new(metrics: &CoreMetrics) -> Result { + let block_height_metrics = + metrics.new_int_gauge("block_height", BLOCK_HEIGHT_HELP, BLOCK_HEIGHT_LABELS)?; + let gas_price_metrics = metrics.new_gauge("gas_price", GAS_PRICE_HELP, GAS_PRICE_LABELS)?; + let critical_error_metrics = + metrics.new_int_gauge("critical_error", CRITICAL_ERROR_HELP, CRITICAL_ERROR_LABELS)?; + let chain_metrics = ChainMetrics { + block_height: block_height_metrics, + gas_price: Some(gas_price_metrics), + critical_error: critical_error_metrics, + }; + Ok(chain_metrics) + } + + pub(crate) fn set_gas_price(&self, chain: &str, price: f64) { + if let Some(gas_price) = &self.gas_price { + gas_price.with(&hashmap! { "chain" => chain }).set(price); + } + } + + pub(crate) fn set_block_height(&self, chain: &str, height: i64) { + self.block_height + .with(&hashmap! { "chain" => chain }) + .set(height); + } + + /// Flag that a critical error has occurred on the chain + pub fn set_critical_error(&self, chain: &str, is_critical: bool) { + self.critical_error + .with(&hashmap! { "chain" => chain }) + .set(is_critical as i64); + } } /// Configuration for the prometheus middleware. This can be loaded via serde. @@ -174,8 +206,6 @@ impl MetricsUpdater { } async fn update_block_details(&self) { - let block_height = self.chain_metrics.block_height.clone(); - let gas_price = self.chain_metrics.gas_price.clone(); if let HyperlaneDomain::Unknown { .. } = self.conf.domain { return; }; @@ -195,10 +225,8 @@ impl MetricsUpdater { let height = chain_metrics.latest_block.number as i64; trace!(chain, height, "Fetched block height for metrics"); - block_height - .with(&hashmap! { "chain" => chain }) - .set(height); - if let Some(gas_price) = gas_price { + self.chain_metrics.set_block_height(chain, height); + if self.chain_metrics.gas_price.is_some() { let protocol = self.conf.domain.domain_protocol(); let decimals_scale = 10f64.powf(decimals_by_protocol(protocol).into()); let gas = u256_as_scaled_f64(chain_metrics.min_gas_price.unwrap_or_default(), protocol) @@ -208,7 +236,7 @@ impl MetricsUpdater { gas = format!("{gas:.2}"), "Gas price updated for chain (using lowest denomination)" ); - gas_price.with(&hashmap! { "chain" => chain }).set(gas); + self.chain_metrics.set_gas_price(chain, gas); } }