From 0a3cd50d2dd1ff67bf051dd912dd5f2e9ec79a42 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 8 May 2024 10:35:19 +0530 Subject: [PATCH 01/27] track hashchain --- apps/fortuna/src/api.rs | 42 +------------- apps/fortuna/src/api/revelation.rs | 12 ++-- apps/fortuna/src/command/run.rs | 72 ++++++++++++++++++++++-- apps/fortuna/src/main.rs | 2 + apps/fortuna/src/metrics.rs | 90 ++++++++++++++++++++++++++++++ 5 files changed, 168 insertions(+), 50 deletions(-) create mode 100644 apps/fortuna/src/metrics.rs diff --git a/apps/fortuna/src/api.rs b/apps/fortuna/src/api.rs index 4535c5ca9c..45f2645979 100644 --- a/apps/fortuna/src/api.rs +++ b/apps/fortuna/src/api.rs @@ -5,6 +5,7 @@ use { BlockStatus, EntropyReader, }, + metrics::Metrics, state::HashChainState, }, anyhow::Result, @@ -19,19 +20,10 @@ use { Router, }, ethers::core::types::Address, - prometheus_client::{ - encoding::EncodeLabelSet, - metrics::{ - counter::Counter, - family::Family, - }, - registry::Registry, - }, std::{ collections::HashMap, sync::Arc, }, - tokio::sync::RwLock, url::Url, }; pub use { @@ -89,38 +81,6 @@ pub struct BlockchainState { pub confirmed_block_status: BlockStatus, } -pub struct Metrics { - pub registry: RwLock, - // TODO: track useful metrics. this counter is just a placeholder to get things set up. - pub request_counter: Family, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct Label { - value: String, -} - -impl Metrics { - pub fn new() -> Self { - let mut metrics_registry = Registry::default(); - let http_requests = Family::::default(); - - // Register the metric family with the registry. - metrics_registry.register( - // With the metric name. - "http_requests", - // And the metric help text. - "Number of HTTP requests received", - http_requests.clone(), - ); - - Metrics { - registry: RwLock::new(metrics_registry), - request_counter: http_requests, - } - } -} - pub enum RestError { /// The caller passed a sequence number that isn't within the supported range InvalidSequenceNumber, diff --git a/apps/fortuna/src/api/revelation.rs b/apps/fortuna/src/api/revelation.rs index 93d3588deb..0612c048cd 100644 --- a/apps/fortuna/src/api/revelation.rs +++ b/apps/fortuna/src/api/revelation.rs @@ -1,8 +1,10 @@ use { - crate::api::{ - ChainId, - Label, - RestError, + crate::{ + api::{ + ChainId, + RestError, + }, + metrics::RequestLabel, }, anyhow::Result, axum::{ @@ -46,7 +48,7 @@ pub async fn revelation( state .metrics .request_counter - .get_or_create(&Label { + .get_or_create(&RequestLabel { value: "/v1/chains/{chain_id}/revelations/{sequence}".to_string(), }) .inc(); diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index d3eaea961c..97c20ae2af 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -14,6 +14,10 @@ use { RunOptions, }, keeper, + metrics::{ + self, + ProviderLabel, + }, state::{ HashChainState, PebbleHashChain, @@ -25,14 +29,17 @@ use { Result, }, axum::Router, + ethers::types::Address, std::{ collections::HashMap, net::SocketAddr, sync::Arc, + time::Duration, }, tokio::{ spawn, sync::watch, + time, }, tower_http::cors::CorsLayer, utoipa::OpenApi, @@ -42,6 +49,7 @@ use { pub async fn run_api( socket_addr: SocketAddr, chains: HashMap, + metrics_registry: Arc, mut rx_exit: watch::Receiver, ) -> Result<()> { #[derive(OpenApi)] @@ -63,10 +71,9 @@ pub async fn run_api( )] struct ApiDoc; - let metrics_registry = api::Metrics::new(); let api_state = api::ApiState { chains: Arc::new(chains), - metrics: Arc::new(metrics_registry), + metrics: metrics_registry, }; // Initialize Axum Router. Note the type here is a `Router` due to the use of the @@ -218,11 +225,68 @@ pub async fn run(opts: &RunOptions) -> Result<()> { Ok::<(), Error>(()) }); + let metrics_registry = Arc::new(metrics::Metrics::new()); + if let Some(keeper_private_key) = opts.load_keeper_private_key()? { - spawn(run_keeper(chains.clone(), config, keeper_private_key)); + spawn(run_keeper( + chains.clone(), + config.clone(), + keeper_private_key, + )); } - run_api(opts.addr.clone(), chains, rx_exit).await?; + spawn(track_hashchain( + config.clone(), + opts.provider.clone(), + metrics_registry.clone(), + )); + + run_api(opts.addr.clone(), chains, metrics_registry.clone(), rx_exit).await?; Ok(()) } + + +pub async fn track_hashchain( + config: Config, + provider_address: Address, + metrics_registry: Arc, +) { + loop { + println!("fetching balance"); + for (chain_id, chain_config) in &config.chains { + let contract = match PythContract::from_config(chain_config) { + Ok(r) => r, + Err(_e) => continue, + }; + + let provider_info = match contract.get_provider_info(provider_address).call().await { + Ok(info) => info, + Err(_e) => { + time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + let current_sequence_number = provider_info.sequence_number; + let end_sequence_number = provider_info.end_sequence_number; + + metrics_registry + .current_sequence_number + .get_or_create(&ProviderLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + // TODO: comment on i64 to u64 conversion + .set(current_sequence_number as i64); + metrics_registry + .end_sequence_number + .get_or_create(&ProviderLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + .set(end_sequence_number as i64); + } + + time::sleep(Duration::from_secs(10)).await; + } +} diff --git a/apps/fortuna/src/main.rs b/apps/fortuna/src/main.rs index 4cd2475043..a26e1ed1c2 100644 --- a/apps/fortuna/src/main.rs +++ b/apps/fortuna/src/main.rs @@ -1,5 +1,6 @@ #![allow(clippy::just_underscores_and_digits)] #![feature(slice_flatten)] +#![feature(integer_atomics)] use { anyhow::Result, @@ -12,6 +13,7 @@ pub mod chain; pub mod command; pub mod config; pub mod keeper; +pub mod metrics; pub mod state; // Server TODO list: diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs new file mode 100644 index 0000000000..b3500631a7 --- /dev/null +++ b/apps/fortuna/src/metrics.rs @@ -0,0 +1,90 @@ +use { + crate::api::ChainId, + prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + gauge::{ + Gauge, + }, + }, + registry::Registry, + }, + std::sync::atomic::AtomicU64, + tokio::sync::RwLock, +}; + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct RequestLabel { + pub value: String, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct RpcLabel { + pub chain_id: ChainId, + pub uri: String, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct ProviderLabel { + pub chain_id: String, + pub address: String, +} + +pub struct Metrics { + pub registry: RwLock, + + pub request_counter: Family, + + pub current_sequence_number: Family, + pub end_sequence_number: Family, + // pub balance: Family, + // pub balance_threshold: Family, + // + // pub rpc: Family, + // + // pub gas_spending: Family, + // pub requests: Family, + // pub reveal: Family, +} + +impl Metrics { + pub fn new() -> Self { + let mut metrics_registry = Registry::default(); + + let http_requests = Family::::default(); + metrics_registry.register( + // With the metric name. + "http_requests", + // And the metric help text. + "Number of HTTP requests received", + http_requests.clone(), + ); + + let current_sequence_number = Family::::default(); + metrics_registry.register( + // With the metric name. + "current_sequence_number", + // And the metric help text. + "The sequence number for a new request.", + current_sequence_number.clone(), + ); + + let end_sequence_number = Family::::default(); + metrics_registry.register( + // With the metric name. + "end_sequence_number", + // And the metric help text. + "The sequence number for the last request.", + end_sequence_number.clone(), + ); + + Metrics { + registry: RwLock::new(metrics_registry), + request_counter: http_requests, + current_sequence_number, + end_sequence_number, + } + } +} From 2b38917d9f3c539e75f396ec4807c0f11048fa65 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 8 May 2024 17:43:15 +0530 Subject: [PATCH 02/27] gas spending metrics --- apps/fortuna/src/keeper.rs | 30 ++++++++++++++++++++++-------- apps/fortuna/src/metrics.rs | 11 ++++++----- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index b4e6c2a550..b7e0d501a2 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -221,14 +221,28 @@ pub async fn process_event( }; match pending_tx.await { - Ok(res) => { - tracing::info!( - sequence_number = &event.sequence_number, - "Revealed with res: {:?}", - res - ); - Ok(()) - } + Ok(res) => match res { + Some(res) => { + tracing::info!( + sequence_number = &event.sequence_number, + transaction_hash = &res.transaction_hash.to_string(), + gas_used = ?res.gas_used, + "Revealed with res: {:?}", + res + ); + Ok(()) + } + None => { + tracing::error!( + sequence_number = &event.sequence_number, + "Can't verify the reveal" + ); + // It is better to return an error in this scenario + // For the caller to retry + Err(anyhow!("Can't verify the reveal")) + } + }, + Err(e) => { tracing::error!( sequence_number = &event.sequence_number, diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs index b3500631a7..7649607471 100644 --- a/apps/fortuna/src/metrics.rs +++ b/apps/fortuna/src/metrics.rs @@ -5,13 +5,10 @@ use { metrics::{ counter::Counter, family::Family, - gauge::{ - Gauge, - }, + gauge::Gauge, }, registry::Registry, }, - std::sync::atomic::AtomicU64, tokio::sync::RwLock, }; @@ -44,9 +41,13 @@ pub struct Metrics { // // pub rpc: Family, // - // pub gas_spending: Family, // pub requests: Family, // pub reveal: Family, + + // NOTE: gas_spending is not part of metrics. + // why? + // - it is not a value that increases or decreases over time. Not a counter or a gauge + // - it can't fit in a histogram too. logging and then collecting it is better. } impl Metrics { From 51119987163b5516dc5c802418ab6f5345770ee7 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 8 May 2024 17:58:55 +0530 Subject: [PATCH 03/27] metrics for request reveal processed --- apps/fortuna/src/command/run.rs | 3 ++ apps/fortuna/src/keeper.rs | 50 +++++++++++++++++++++++++++++---- apps/fortuna/src/metrics.rs | 36 ++++++++++++++++++++++-- 3 files changed, 81 insertions(+), 8 deletions(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 97c20ae2af..64d37fe6c3 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -108,6 +108,7 @@ pub async fn run_keeper( chains: HashMap, config: Config, private_key: String, + metrics_registry: Arc, ) -> Result<()> { let mut handles = Vec::new(); for (chain_id, chain_config) in chains { @@ -121,6 +122,7 @@ pub async fn run_keeper( private_key, chain_eth_config, chain_config.clone(), + metrics_registry.clone(), ))); } @@ -232,6 +234,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { chains.clone(), config.clone(), keeper_private_key, + metrics_registry.clone(), )); } diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index b7e0d501a2..280e0a4157 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -12,6 +12,10 @@ use { }, }, config::EthereumConfig, + metrics::{ + Metrics, + ProviderLabel, + }, }, anyhow::{ anyhow, @@ -88,6 +92,7 @@ pub async fn run_keeper_threads( private_key: String, chain_eth_config: EthereumConfig, chain_state: BlockchainState, + metrics: Arc, ) { tracing::info!("starting keeper"); let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; @@ -109,6 +114,7 @@ pub async fn run_keeper_threads( contract.clone(), chain_eth_config.gas_limit, chain_state.clone(), + metrics.clone(), ) .in_current_span(), ); @@ -131,6 +137,7 @@ pub async fn run_keeper_threads( rx, Arc::clone(&contract), chain_eth_config.gas_limit, + metrics.clone(), ) .in_current_span(), ); @@ -146,6 +153,7 @@ pub async fn process_event( chain_config: &BlockchainState, contract: &Arc, gas_limit: U256, + metrics: Arc, ) -> Result<()> { if chain_config.provider_address != event.provider_address { return Ok(()); @@ -230,6 +238,13 @@ pub async fn process_event( "Revealed with res: {:?}", res ); + metrics + .reveals + .get_or_create(&ProviderLabel { + chain_id: chain_config.id.clone(), + address: chain_config.provider_address.to_string(), + }) + .inc(); Ok(()) } None => { @@ -280,6 +295,7 @@ pub async fn process_block_range( contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, + metrics: Arc, ) { let BlockRange { from: first_block, @@ -300,6 +316,7 @@ pub async fn process_block_range( contract.clone(), gas_limit, chain_state.clone(), + metrics.clone(), ) .in_current_span() .await; @@ -316,6 +333,7 @@ pub async fn process_single_block_batch( contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, + metrics: Arc, ) { loop { let events_res = chain_state @@ -327,11 +345,23 @@ pub async fn process_single_block_batch( Ok(events) => { tracing::info!(num_of_events = &events.len(), "Processing",); for event in &events { + metrics + .requests + .get_or_create(&ProviderLabel { + chain_id: chain_state.id.clone(), + address: chain_state.provider_address.to_string(), + }) + .inc(); tracing::info!(sequence_number = &event.sequence_number, "Processing event",); - while let Err(e) = - process_event(event.clone(), &chain_state, &contract, gas_limit) - .in_current_span() - .await + while let Err(e) = process_event( + event.clone(), + &chain_state, + &contract, + gas_limit, + metrics.clone(), + ) + .in_current_span() + .await { tracing::error!( sequence_number = &event.sequence_number, @@ -342,6 +372,13 @@ pub async fn process_single_block_batch( time::sleep(RETRY_INTERVAL).await; } tracing::info!(sequence_number = &event.sequence_number, "Processed event",); + metrics + .requests_processed + .get_or_create(&ProviderLabel { + chain_id: chain_state.id.clone(), + address: chain_state.provider_address.to_string(), + }) + .inc(); } tracing::info!(num_of_events = &events.len(), "Processed",); break; @@ -469,6 +506,7 @@ pub async fn process_new_blocks( mut rx: mpsc::Receiver, contract: Arc, gas_limit: U256, + metrics: Arc, ) { tracing::info!("Waiting for new block ranges to process"); loop { @@ -478,6 +516,7 @@ pub async fn process_new_blocks( Arc::clone(&contract), gas_limit, chain_state.clone(), + metrics.clone(), ) .in_current_span() .await; @@ -492,9 +531,10 @@ pub async fn process_backlog( contract: Arc, gas_limit: U256, chain_state: BlockchainState, + metrics: Arc, ) { tracing::info!("Processing backlog"); - process_block_range(backlog_range, contract, gas_limit, chain_state) + process_block_range(backlog_range, contract, gas_limit, chain_state, metrics) .in_current_span() .await; tracing::info!("Backlog processed"); diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs index 7649607471..43558ea7f0 100644 --- a/apps/fortuna/src/metrics.rs +++ b/apps/fortuna/src/metrics.rs @@ -41,9 +41,9 @@ pub struct Metrics { // // pub rpc: Family, // - // pub requests: Family, - // pub reveal: Family, - + pub requests: Family, + pub requests_processed: Family, + pub reveals: Family, // NOTE: gas_spending is not part of metrics. // why? // - it is not a value that increases or decreases over time. Not a counter or a gauge @@ -81,11 +81,41 @@ impl Metrics { end_sequence_number.clone(), ); + let requests = Family::::default(); + metrics_registry.register( + // With the metric name. + "requests", + // And the metric help text. + "Number of requests received", + requests.clone(), + ); + + let requests_processed = Family::::default(); + metrics_registry.register( + // With the metric name. + "requests_processed", + // And the metric help text. + "Number of requests processed", + requests_processed.clone(), + ); + + let reveals = Family::::default(); + metrics_registry.register( + // With the metric name. + "reveal", + // And the metric help text. + "Number of reveals", + reveals.clone(), + ); + Metrics { registry: RwLock::new(metrics_registry), request_counter: http_requests, current_sequence_number, end_sequence_number, + requests, + requests_processed, + reveals, } } } From b2fa80f7e57a37fc0cf2795990ecc8a2a9292a9a Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 8 May 2024 18:03:47 +0530 Subject: [PATCH 04/27] remove println --- apps/fortuna/src/command/run.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 64d37fe6c3..0cb9eb08c4 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -256,7 +256,6 @@ pub async fn track_hashchain( metrics_registry: Arc, ) { loop { - println!("fetching balance"); for (chain_id, chain_config) in &config.chains { let contract = match PythContract::from_config(chain_config) { Ok(r) => r, From 6e672be3a42ec9768e8e3aca0bf1867e3d2ff8e8 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 8 May 2024 20:06:40 +0530 Subject: [PATCH 05/27] keeper balance --- apps/fortuna/src/command/run.rs | 52 +++++++++++++++++++++++++++++++-- apps/fortuna/src/metrics.rs | 20 ++++++++++--- 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 0cb9eb08c4..d7439b82e2 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -29,7 +29,18 @@ use { Result, }, axum::Router, - ethers::types::Address, + ethers::{ + middleware::Middleware, + providers::{ + Http, + Provider, + }, + signers::{ + LocalWallet, + Signer, + }, + types::Address, + }, std::{ collections::HashMap, net::SocketAddr, @@ -230,12 +241,20 @@ pub async fn run(opts: &RunOptions) -> Result<()> { let metrics_registry = Arc::new(metrics::Metrics::new()); if let Some(keeper_private_key) = opts.load_keeper_private_key()? { + let keeper_address = keeper_private_key.parse::()?.address(); + spawn(run_keeper( chains.clone(), config.clone(), keeper_private_key, metrics_registry.clone(), )); + + spawn(track_balance( + config.clone(), + keeper_address, + metrics_registry.clone(), + )); } spawn(track_hashchain( @@ -243,12 +262,41 @@ pub async fn run(opts: &RunOptions) -> Result<()> { opts.provider.clone(), metrics_registry.clone(), )); - run_api(opts.addr.clone(), chains, metrics_registry.clone(), rx_exit).await?; Ok(()) } +pub async fn track_balance( + config: Config, + keeper_address: Address, + metrics_registry: Arc, +) { + loop { + for (chain_id, chain_config) in &config.chains { + let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { + Ok(r) => r, + Err(_e) => continue, + }; + + let balance = match provider.get_balance(keeper_address, None).await { + Ok(r) => r.as_u128(), + Err(_e) => continue, + }; + let balance = balance as f64 / 1e18; + + metrics_registry + .balance + .get_or_create(&ProviderLabel { + chain_id: chain_id.clone(), + address: keeper_address.to_string(), + }) + // comment on why is this ok + .set(balance); + } + } +} + pub async fn track_hashchain( config: Config, diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs index 43558ea7f0..bf136ab9b8 100644 --- a/apps/fortuna/src/metrics.rs +++ b/apps/fortuna/src/metrics.rs @@ -9,6 +9,7 @@ use { }, registry::Registry, }, + std::sync::atomic::AtomicU64, tokio::sync::RwLock, }; @@ -36,10 +37,7 @@ pub struct Metrics { pub current_sequence_number: Family, pub end_sequence_number: Family, - // pub balance: Family, - // pub balance_threshold: Family, - // - // pub rpc: Family, + pub balance: Family>, // pub requests: Family, pub requests_processed: Family, @@ -48,6 +46,10 @@ pub struct Metrics { // why? // - it is not a value that increases or decreases over time. Not a counter or a gauge // - it can't fit in a histogram too. logging and then collecting it is better. + // NOTE: rpc is not part of metrics. + // why? + // - which metric type should we use to track it? + // - let's just use fetched latest safe block from logs } impl Metrics { @@ -108,6 +110,15 @@ impl Metrics { reveals.clone(), ); + let balance = Family::>::default(); + metrics_registry.register( + // With the metric name. + "balance", + // And the metric help text. + "Balance of the keeper", + balance.clone(), + ); + Metrics { registry: RwLock::new(metrics_registry), request_counter: http_requests, @@ -116,6 +127,7 @@ impl Metrics { requests, requests_processed, reveals, + balance, } } } From 75fb482de73abe7dc314a953f9e15c5ba3d0faa6 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 8 May 2024 20:13:20 +0530 Subject: [PATCH 06/27] collected fee --- apps/fortuna/src/command/run.rs | 37 +++++++++++++++++++++++++++++++++ apps/fortuna/src/metrics.rs | 11 ++++++++++ 2 files changed, 48 insertions(+) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index d7439b82e2..675ad1eb13 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -262,6 +262,11 @@ pub async fn run(opts: &RunOptions) -> Result<()> { opts.provider.clone(), metrics_registry.clone(), )); + spawn(track_collected_fee( + config.clone(), + opts.provider.clone(), + metrics_registry.clone(), + )); run_api(opts.addr.clone(), chains, metrics_registry.clone(), rx_exit).await?; Ok(()) @@ -297,6 +302,38 @@ pub async fn track_balance( } } +pub async fn track_collected_fee( + config: Config, + provider_address: Address, + metrics_registry: Arc, +) { + loop { + for (chain_id, chain_config) in &config.chains { + let contract = match PythContract::from_config(chain_config) { + Ok(r) => r, + Err(_e) => continue, + }; + + let provider_info = match contract.get_provider_info(provider_address).call().await { + Ok(info) => info, + Err(_e) => { + time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18; + + metrics_registry + .collected_fee + .get_or_create(&ProviderLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + .set(collected_fee); + } + } +} + pub async fn track_hashchain( config: Config, diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs index bf136ab9b8..20ed17158d 100644 --- a/apps/fortuna/src/metrics.rs +++ b/apps/fortuna/src/metrics.rs @@ -38,6 +38,7 @@ pub struct Metrics { pub current_sequence_number: Family, pub end_sequence_number: Family, pub balance: Family>, + pub collected_fee: Family>, // pub requests: Family, pub requests_processed: Family, @@ -119,6 +120,15 @@ impl Metrics { balance.clone(), ); + let collected_fee = Family::>::default(); + metrics_registry.register( + // With the metric name. + "collected_fee", + // And the metric help text. + "Collected fee on the contract", + collected_fee.clone(), + ); + Metrics { registry: RwLock::new(metrics_registry), request_counter: http_requests, @@ -128,6 +138,7 @@ impl Metrics { requests_processed, reveals, balance, + collected_fee, } } } From e576abb2e786dfd7f5862a28a225f9e829cee0c1 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Wed, 8 May 2024 20:22:52 +0530 Subject: [PATCH 07/27] total gas spent metrics --- apps/fortuna/src/keeper.rs | 12 ++++++++++++ apps/fortuna/src/metrics.rs | 16 +++++++++++----- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 280e0a4157..f9efb23b2e 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -238,6 +238,18 @@ pub async fn process_event( "Revealed with res: {:?}", res ); + + if let Some(gas_used) = res.gas_used { + let gas_used = gas_used.as_u128() as f64 / 1e18; + metrics + .total_gas_spent + .get_or_create(&ProviderLabel { + chain_id: chain_config.id.clone(), + address: chain_config.provider_address.to_string(), + }) + .inc_by(gas_used); + } + metrics .reveals .get_or_create(&ProviderLabel { diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs index 20ed17158d..08da973b0a 100644 --- a/apps/fortuna/src/metrics.rs +++ b/apps/fortuna/src/metrics.rs @@ -39,14 +39,10 @@ pub struct Metrics { pub end_sequence_number: Family, pub balance: Family>, pub collected_fee: Family>, - // + pub total_gas_spent: Family>, pub requests: Family, pub requests_processed: Family, pub reveals: Family, - // NOTE: gas_spending is not part of metrics. - // why? - // - it is not a value that increases or decreases over time. Not a counter or a gauge - // - it can't fit in a histogram too. logging and then collecting it is better. // NOTE: rpc is not part of metrics. // why? // - which metric type should we use to track it? @@ -129,6 +125,15 @@ impl Metrics { collected_fee.clone(), ); + let total_gas_spent = Family::>::default(); + metrics_registry.register( + // With the metric name. + "total_gas_spent", + // And the metric help text. + "Total gas spent revealing requests", + total_gas_spent.clone(), + ); + Metrics { registry: RwLock::new(metrics_registry), request_counter: http_requests, @@ -139,6 +144,7 @@ impl Metrics { reveals, balance, collected_fee, + total_gas_spent, } } } From 2001274c7bb5a52190a4b25c9d908bf042309409 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Fri, 10 May 2024 14:23:32 +0530 Subject: [PATCH 08/27] some refactoring --- apps/fortuna/src/command/run.rs | 27 ++++++++++++++------- apps/fortuna/src/keeper.rs | 29 ++++++++++++----------- apps/fortuna/src/metrics.rs | 42 +++++++++++++++------------------ 3 files changed, 52 insertions(+), 46 deletions(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 675ad1eb13..c60e6e369e 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -16,7 +16,7 @@ use { keeper, metrics::{ self, - ProviderLabel, + AccountLabel, }, state::{ HashChainState, @@ -274,7 +274,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { pub async fn track_balance( config: Config, - keeper_address: Address, + address: Address, metrics_registry: Arc, ) { loop { @@ -284,17 +284,21 @@ pub async fn track_balance( Err(_e) => continue, }; - let balance = match provider.get_balance(keeper_address, None).await { + let balance = match provider.get_balance(address, None).await { + // This conversion to u128 is fine as the total balance will never cross the limits + // of u128 practically. Ok(r) => r.as_u128(), Err(_e) => continue, }; + // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. + // The balance is in wei, so we need to divide by 1e18 to convert it to eth. let balance = balance as f64 / 1e18; metrics_registry .balance - .get_or_create(&ProviderLabel { + .get_or_create(&AccountLabel { chain_id: chain_id.clone(), - address: keeper_address.to_string(), + address: address.to_string(), }) // comment on why is this ok .set(balance); @@ -321,11 +325,14 @@ pub async fn track_collected_fee( continue; } }; + + // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. + // The fee is in wei, so we need to divide by 1e18 to convert it to eth. let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18; metrics_registry .collected_fee - .get_or_create(&ProviderLabel { + .get_or_create(&AccountLabel { chain_id: chain_id.clone(), address: provider_address.to_string(), }) @@ -359,15 +366,17 @@ pub async fn track_hashchain( metrics_registry .current_sequence_number - .get_or_create(&ProviderLabel { + .get_or_create(&AccountLabel { chain_id: chain_id.clone(), address: provider_address.to_string(), }) - // TODO: comment on i64 to u64 conversion + // sequence_number type on chain is u64 but practically it will take + // a long time for it to cross the limits of i64. + // currently prometheus only supports i64 for Gauge types .set(current_sequence_number as i64); metrics_registry .end_sequence_number - .get_or_create(&ProviderLabel { + .get_or_create(&AccountLabel { chain_id: chain_id.clone(), address: provider_address.to_string(), }) diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index f9efb23b2e..15999fe83f 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -13,8 +13,8 @@ use { }, config::EthereumConfig, metrics::{ + AccountLabel, Metrics, - ProviderLabel, }, }, anyhow::{ @@ -29,6 +29,7 @@ use { Ws, }, types::U256, + signers::Signer, }, futures::StreamExt, std::sync::Arc, @@ -239,20 +240,20 @@ pub async fn process_event( res ); - if let Some(gas_used) = res.gas_used { - let gas_used = gas_used.as_u128() as f64 / 1e18; - metrics - .total_gas_spent - .get_or_create(&ProviderLabel { - chain_id: chain_config.id.clone(), - address: chain_config.provider_address.to_string(), - }) - .inc_by(gas_used); - } + if let Some(gas_used) = res.gas_used { + let gas_used = gas_used.as_u128() as f64 / 1e18; + metrics + .total_gas_spent + .get_or_create(&AccountLabel { + chain_id: chain_config.id.clone(), + address: contract.client().inner().inner().signer().address().to_string(), + }) + .inc_by(gas_used); + } metrics .reveals - .get_or_create(&ProviderLabel { + .get_or_create(&AccountLabel { chain_id: chain_config.id.clone(), address: chain_config.provider_address.to_string(), }) @@ -359,7 +360,7 @@ pub async fn process_single_block_batch( for event in &events { metrics .requests - .get_or_create(&ProviderLabel { + .get_or_create(&AccountLabel { chain_id: chain_state.id.clone(), address: chain_state.provider_address.to_string(), }) @@ -386,7 +387,7 @@ pub async fn process_single_block_batch( tracing::info!(sequence_number = &event.sequence_number, "Processed event",); metrics .requests_processed - .get_or_create(&ProviderLabel { + .get_or_create(&AccountLabel { chain_id: chain_state.id.clone(), address: chain_state.provider_address.to_string(), }) diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs index 08da973b0a..bb52fd77fa 100644 --- a/apps/fortuna/src/metrics.rs +++ b/apps/fortuna/src/metrics.rs @@ -25,7 +25,7 @@ pub struct RpcLabel { } #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct ProviderLabel { +pub struct AccountLabel { pub chain_id: String, pub address: String, } @@ -35,18 +35,14 @@ pub struct Metrics { pub request_counter: Family, - pub current_sequence_number: Family, - pub end_sequence_number: Family, - pub balance: Family>, - pub collected_fee: Family>, - pub total_gas_spent: Family>, - pub requests: Family, - pub requests_processed: Family, - pub reveals: Family, - // NOTE: rpc is not part of metrics. - // why? - // - which metric type should we use to track it? - // - let's just use fetched latest safe block from logs + pub current_sequence_number: Family, + pub end_sequence_number: Family, + pub balance: Family>, + pub collected_fee: Family>, + pub total_gas_spent: Family>, + pub requests: Family, + pub requests_processed: Family, + pub reveals: Family, } impl Metrics { @@ -62,7 +58,7 @@ impl Metrics { http_requests.clone(), ); - let current_sequence_number = Family::::default(); + let current_sequence_number = Family::::default(); metrics_registry.register( // With the metric name. "current_sequence_number", @@ -71,25 +67,25 @@ impl Metrics { current_sequence_number.clone(), ); - let end_sequence_number = Family::::default(); + let end_sequence_number = Family::::default(); metrics_registry.register( // With the metric name. "end_sequence_number", // And the metric help text. - "The sequence number for the last request.", + "The sequence number for the end request.", end_sequence_number.clone(), ); - let requests = Family::::default(); + let requests = Family::::default(); metrics_registry.register( // With the metric name. "requests", // And the metric help text. - "Number of requests received", + "Number of requests received through events", requests.clone(), ); - let requests_processed = Family::::default(); + let requests_processed = Family::::default(); metrics_registry.register( // With the metric name. "requests_processed", @@ -98,7 +94,7 @@ impl Metrics { requests_processed.clone(), ); - let reveals = Family::::default(); + let reveals = Family::::default(); metrics_registry.register( // With the metric name. "reveal", @@ -107,7 +103,7 @@ impl Metrics { reveals.clone(), ); - let balance = Family::>::default(); + let balance = Family::>::default(); metrics_registry.register( // With the metric name. "balance", @@ -116,7 +112,7 @@ impl Metrics { balance.clone(), ); - let collected_fee = Family::>::default(); + let collected_fee = Family::>::default(); metrics_registry.register( // With the metric name. "collected_fee", @@ -125,7 +121,7 @@ impl Metrics { collected_fee.clone(), ); - let total_gas_spent = Family::>::default(); + let total_gas_spent = Family::>::default(); metrics_registry.register( // With the metric name. "total_gas_spent", From 0581fdb95d3c244bf71449bec4530c8a2916ffbc Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Fri, 10 May 2024 14:38:17 +0530 Subject: [PATCH 09/27] add comments --- apps/fortuna/src/command/run.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index c60e6e369e..7e81de454b 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -57,6 +57,8 @@ use { utoipa_swagger_ui::SwaggerUi, }; +let TRACK_DURATION = Duration::from_secs(10); + pub async fn run_api( socket_addr: SocketAddr, chains: HashMap, @@ -272,6 +274,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { Ok(()) } +/// tracks the balance of the given address for each chain in the given config periodically pub async fn track_balance( config: Config, address: Address, @@ -303,9 +306,12 @@ pub async fn track_balance( // comment on why is this ok .set(balance); } + + time::sleep(TRACK_DURATION).await; } } +/// tracks the collected fees of the given address for each chain in the given config periodically pub async fn track_collected_fee( config: Config, provider_address: Address, @@ -338,10 +344,13 @@ pub async fn track_collected_fee( }) .set(collected_fee); } + + time::sleep(TRACK_DURATION).await; } } - +/// tracks the current sequence number and end sequence number of the given provider address for +/// each chain in the given config periodically pub async fn track_hashchain( config: Config, provider_address: Address, @@ -383,6 +392,6 @@ pub async fn track_hashchain( .set(end_sequence_number as i64); } - time::sleep(Duration::from_secs(10)).await; + time::sleep(TRACK_DURATION).await; } } From b4d91b718e06b90ac854c5ea51879fdef7c12ef7 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Fri, 10 May 2024 14:48:17 +0530 Subject: [PATCH 10/27] fix --- apps/fortuna/src/command/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 7e81de454b..4a6efa7c3a 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -57,7 +57,7 @@ use { utoipa_swagger_ui::SwaggerUi, }; -let TRACK_DURATION = Duration::from_secs(10); +const TRACK_DURATION: Duration = Duration::from_secs(10); pub async fn run_api( socket_addr: SocketAddr, From ecefcffbc8d1bf650952926ab329dc337ad06c99 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Fri, 10 May 2024 14:50:25 +0530 Subject: [PATCH 11/27] pre commit --- apps/fortuna/src/command/run.rs | 2 +- apps/fortuna/src/keeper.rs | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 4a6efa7c3a..e2c49c3016 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -349,7 +349,7 @@ pub async fn track_collected_fee( } } -/// tracks the current sequence number and end sequence number of the given provider address for +/// tracks the current sequence number and end sequence number of the given provider address for /// each chain in the given config periodically pub async fn track_hashchain( config: Config, diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 15999fe83f..d0855afebf 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -28,8 +28,8 @@ use { Provider, Ws, }, - types::U256, signers::Signer, + types::U256, }, futures::StreamExt, std::sync::Arc, @@ -246,7 +246,13 @@ pub async fn process_event( .total_gas_spent .get_or_create(&AccountLabel { chain_id: chain_config.id.clone(), - address: contract.client().inner().inner().signer().address().to_string(), + address: contract + .client() + .inner() + .inner() + .signer() + .address() + .to_string(), }) .inc_by(gas_used); } From b8adbf1d8f9717c9308f103cbb8166aab60c3920 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Fri, 10 May 2024 15:14:26 +0530 Subject: [PATCH 12/27] fix --- apps/fortuna/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/fortuna/src/main.rs b/apps/fortuna/src/main.rs index a26e1ed1c2..b48aea36b5 100644 --- a/apps/fortuna/src/main.rs +++ b/apps/fortuna/src/main.rs @@ -1,6 +1,5 @@ #![allow(clippy::just_underscores_and_digits)] #![feature(slice_flatten)] -#![feature(integer_atomics)] use { anyhow::Result, From ce0ccfb403cf86ac926b7b32354daa527d50eec3 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Fri, 10 May 2024 22:45:44 +0530 Subject: [PATCH 13/27] remove RpcLabel --- apps/fortuna/src/metrics.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs index bb52fd77fa..4ffac73912 100644 --- a/apps/fortuna/src/metrics.rs +++ b/apps/fortuna/src/metrics.rs @@ -1,5 +1,4 @@ use { - crate::api::ChainId, prometheus_client::{ encoding::EncodeLabelSet, metrics::{ @@ -18,12 +17,6 @@ pub struct RequestLabel { pub value: String, } -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct RpcLabel { - pub chain_id: ChainId, - pub uri: String, -} - #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] pub struct AccountLabel { pub chain_id: String, From 1d461d1c25ab6e212773985d6c685a66f5f5eaaa Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Fri, 10 May 2024 22:48:12 +0530 Subject: [PATCH 14/27] remove comments --- apps/fortuna/src/metrics.rs | 30 ++---------------------------- 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs index 4ffac73912..5e91a74801 100644 --- a/apps/fortuna/src/metrics.rs +++ b/apps/fortuna/src/metrics.rs @@ -44,81 +44,55 @@ impl Metrics { let http_requests = Family::::default(); metrics_registry.register( - // With the metric name. "http_requests", - // And the metric help text. "Number of HTTP requests received", http_requests.clone(), ); let current_sequence_number = Family::::default(); metrics_registry.register( - // With the metric name. "current_sequence_number", - // And the metric help text. "The sequence number for a new request.", current_sequence_number.clone(), ); let end_sequence_number = Family::::default(); metrics_registry.register( - // With the metric name. "end_sequence_number", - // And the metric help text. "The sequence number for the end request.", end_sequence_number.clone(), ); let requests = Family::::default(); metrics_registry.register( - // With the metric name. "requests", - // And the metric help text. "Number of requests received through events", requests.clone(), ); let requests_processed = Family::::default(); metrics_registry.register( - // With the metric name. "requests_processed", - // And the metric help text. "Number of requests processed", requests_processed.clone(), ); let reveals = Family::::default(); - metrics_registry.register( - // With the metric name. - "reveal", - // And the metric help text. - "Number of reveals", - reveals.clone(), - ); + metrics_registry.register("reveal", "Number of reveals", reveals.clone()); let balance = Family::>::default(); - metrics_registry.register( - // With the metric name. - "balance", - // And the metric help text. - "Balance of the keeper", - balance.clone(), - ); + metrics_registry.register("balance", "Balance of the keeper", balance.clone()); let collected_fee = Family::>::default(); metrics_registry.register( - // With the metric name. "collected_fee", - // And the metric help text. "Collected fee on the contract", collected_fee.clone(), ); let total_gas_spent = Family::>::default(); metrics_registry.register( - // With the metric name. "total_gas_spent", - // And the metric help text. "Total gas spent revealing requests", total_gas_spent.clone(), ); From 30f3a26dd344b75b810d2ece72b3e6082ce1fbbc Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Fri, 10 May 2024 22:50:22 +0530 Subject: [PATCH 15/27] rename method --- apps/fortuna/src/api/revelation.rs | 2 +- apps/fortuna/src/metrics.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/fortuna/src/api/revelation.rs b/apps/fortuna/src/api/revelation.rs index 0612c048cd..83feeb4e38 100644 --- a/apps/fortuna/src/api/revelation.rs +++ b/apps/fortuna/src/api/revelation.rs @@ -47,7 +47,7 @@ pub async fn revelation( ) -> Result, RestError> { state .metrics - .request_counter + .http_requests .get_or_create(&RequestLabel { value: "/v1/chains/{chain_id}/revelations/{sequence}".to_string(), }) diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs index 5e91a74801..57e2b43ce0 100644 --- a/apps/fortuna/src/metrics.rs +++ b/apps/fortuna/src/metrics.rs @@ -26,7 +26,7 @@ pub struct AccountLabel { pub struct Metrics { pub registry: RwLock, - pub request_counter: Family, + pub http_requests: Family, pub current_sequence_number: Family, pub end_sequence_number: Family, @@ -99,7 +99,7 @@ impl Metrics { Metrics { registry: RwLock::new(metrics_registry), - request_counter: http_requests, + http_requests, current_sequence_number, end_sequence_number, requests, From 38a9770b23b0b5a637222e117e30997751126a2c Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Fri, 10 May 2024 22:51:12 +0530 Subject: [PATCH 16/27] remove comment --- apps/fortuna/src/command/run.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index e2c49c3016..c312fefb1b 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -293,7 +293,7 @@ pub async fn track_balance( Ok(r) => r.as_u128(), Err(_e) => continue, }; - // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. + // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus. // The balance is in wei, so we need to divide by 1e18 to convert it to eth. let balance = balance as f64 / 1e18; @@ -303,7 +303,6 @@ pub async fn track_balance( chain_id: chain_id.clone(), address: address.to_string(), }) - // comment on why is this ok .set(balance); } From fedadbe0c04f00d0fa93123df31cbb084a1f9a9c Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Fri, 10 May 2024 22:53:29 +0530 Subject: [PATCH 17/27] rename track_duration --- apps/fortuna/src/command/run.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index c312fefb1b..40d20f2aa0 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -57,7 +57,7 @@ use { utoipa_swagger_ui::SwaggerUi, }; -const TRACK_DURATION: Duration = Duration::from_secs(10); +const TRACK_INTERVAL: Duration = Duration::from_secs(10); pub async fn run_api( socket_addr: SocketAddr, @@ -306,7 +306,7 @@ pub async fn track_balance( .set(balance); } - time::sleep(TRACK_DURATION).await; + time::sleep(TRACK_INTERVAL).await; } } @@ -344,7 +344,7 @@ pub async fn track_collected_fee( .set(collected_fee); } - time::sleep(TRACK_DURATION).await; + time::sleep(TRACK_INTERVAL).await; } } @@ -391,6 +391,6 @@ pub async fn track_hashchain( .set(end_sequence_number as i64); } - time::sleep(TRACK_DURATION).await; + time::sleep(TRACK_INTERVAL).await; } } From 466cef5fd94fddd2ff3ed05d983c04f0a031bbea Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Mon, 13 May 2024 11:00:29 +0530 Subject: [PATCH 18/27] isolate metrics for api --- apps/fortuna/src/api.rs | 68 ++++++++++++++++++++++++------ apps/fortuna/src/api/metrics.rs | 2 +- apps/fortuna/src/api/revelation.rs | 10 ++--- apps/fortuna/src/command/run.rs | 18 +++++--- apps/fortuna/src/metrics.rs | 15 ------- 5 files changed, 70 insertions(+), 43 deletions(-) diff --git a/apps/fortuna/src/api.rs b/apps/fortuna/src/api.rs index 45f2645979..d7a419677c 100644 --- a/apps/fortuna/src/api.rs +++ b/apps/fortuna/src/api.rs @@ -5,7 +5,6 @@ use { BlockStatus, EntropyReader, }, - metrics::Metrics, state::HashChainState, }, anyhow::Result, @@ -20,10 +19,19 @@ use { Router, }, ethers::core::types::Address, + prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + }, + registry::Registry, + }, std::{ collections::HashMap, sync::Arc, }, + tokio::sync::RwLock, url::Url, }; pub use { @@ -44,20 +52,44 @@ mod revelation; pub type ChainId = String; +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct RequestLabel { + pub value: String, +} + +pub struct ApiMetrics { + pub metrics_registry: Arc>, + pub http_requests: Family, +} + #[derive(Clone)] pub struct ApiState { pub chains: Arc>, /// Prometheus metrics - pub metrics: Arc, + pub metrics: Arc, } impl ApiState { - pub fn new(chains: &[(ChainId, BlockchainState)]) -> ApiState { - let map: HashMap = chains.into_iter().cloned().collect(); + pub async fn new( + chains: HashMap, + metrics_registry: Arc>, + ) -> ApiState { + let metrics = ApiMetrics { + http_requests: Family::default(), + metrics_registry, + }; + + let http_requests = metrics.http_requests.clone(); + metrics.metrics_registry.write().await.register( + "http_requests", + "Number of HTTP requests received", + http_requests, + ); + ApiState { - chains: Arc::new(map), - metrics: Arc::new(Metrics::new()), + chains: Arc::new(chains), + metrics: Arc::new(metrics), } } } @@ -185,7 +217,12 @@ mod test { }, ethers::prelude::Address, lazy_static::lazy_static, - std::sync::Arc, + prometheus_client::registry::Registry, + std::{ + collections::HashMap, + sync::Arc, + }, + tokio::sync::RwLock, }; const PROVIDER: Address = Address::zero(); @@ -203,7 +240,7 @@ mod test { )); } - fn test_server() -> (TestServer, Arc, Arc) { + async fn test_server() -> (TestServer, Arc, Arc) { let eth_read = Arc::new(MockEntropyReader::with_requests(10, &[])); let eth_state = BlockchainState { @@ -215,6 +252,8 @@ mod test { confirmed_block_status: BlockStatus::Latest, }; + let metrics_registry = Arc::new(RwLock::new(Registry::default())); + let avax_read = Arc::new(MockEntropyReader::with_requests(10, &[])); let avax_state = BlockchainState { @@ -226,10 +265,11 @@ mod test { confirmed_block_status: BlockStatus::Latest, }; - let api_state = ApiState::new(&[ - ("ethereum".into(), eth_state), - ("avalanche".into(), avax_state), - ]); + let mut chains = HashMap::new(); + chains.insert("ethereum".into(), eth_state); + chains.insert("avalanche".into(), avax_state); + + let api_state = ApiState::new(chains, metrics_registry).await; let app = api::routes(api_state); (TestServer::new(app).unwrap(), eth_read, avax_read) @@ -247,7 +287,7 @@ mod test { #[tokio::test] async fn test_revelation() { - let (server, eth_contract, avax_contract) = test_server(); + let (server, eth_contract, avax_contract) = test_server().await; // Can't access a revelation if it hasn't been requested get_and_assert_status( @@ -376,7 +416,7 @@ mod test { #[tokio::test] async fn test_revelation_confirmation_delay() { - let (server, eth_contract, avax_contract) = test_server(); + let (server, eth_contract, avax_contract) = test_server().await; eth_contract.insert(PROVIDER, 0, 10, false); eth_contract.insert(PROVIDER, 1, 11, false); diff --git a/apps/fortuna/src/api/metrics.rs b/apps/fortuna/src/api/metrics.rs index b211adec0c..3f7c1f4016 100644 --- a/apps/fortuna/src/api/metrics.rs +++ b/apps/fortuna/src/api/metrics.rs @@ -9,7 +9,7 @@ use { }; pub async fn metrics(State(state): State) -> impl IntoResponse { - let registry = state.metrics.registry.read().await; + let registry = state.metrics.metrics_registry.read().await; let mut buffer = String::new(); // Should not fail if the metrics are valid and there is memory available diff --git a/apps/fortuna/src/api/revelation.rs b/apps/fortuna/src/api/revelation.rs index 83feeb4e38..f85462cdce 100644 --- a/apps/fortuna/src/api/revelation.rs +++ b/apps/fortuna/src/api/revelation.rs @@ -1,10 +1,8 @@ use { - crate::{ - api::{ - ChainId, - RestError, - }, - metrics::RequestLabel, + crate::api::{ + ChainId, + RequestLabel, + RestError, }, anyhow::Result, axum::{ diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 40d20f2aa0..6298e7b313 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -41,6 +41,7 @@ use { }, types::Address, }, + prometheus_client::registry::Registry, std::{ collections::HashMap, net::SocketAddr, @@ -49,7 +50,10 @@ use { }, tokio::{ spawn, - sync::watch, + sync::{ + watch, + RwLock, + }, time, }, tower_http::cors::CorsLayer, @@ -62,7 +66,7 @@ const TRACK_INTERVAL: Duration = Duration::from_secs(10); pub async fn run_api( socket_addr: SocketAddr, chains: HashMap, - metrics_registry: Arc, + metrics_registry: Arc>, mut rx_exit: watch::Receiver, ) -> Result<()> { #[derive(OpenApi)] @@ -84,10 +88,7 @@ pub async fn run_api( )] struct ApiDoc; - let api_state = api::ApiState { - chains: Arc::new(chains), - metrics: metrics_registry, - }; + let api_state = api::ApiState::new(chains, metrics_registry).await; // Initialize Axum Router. Note the type here is a `Router` due to the use of the // `with_state` method which replaces `Body` with `State` in the type signature. @@ -240,6 +241,9 @@ pub async fn run(opts: &RunOptions) -> Result<()> { Ok::<(), Error>(()) }); + let registry = Arc::new(RwLock::new(Registry::default())); + + let metrics_registry = Arc::new(metrics::Metrics::new()); if let Some(keeper_private_key) = opts.load_keeper_private_key()? { @@ -269,7 +273,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { opts.provider.clone(), metrics_registry.clone(), )); - run_api(opts.addr.clone(), chains, metrics_registry.clone(), rx_exit).await?; + run_api(opts.addr.clone(), chains, registry.clone(), rx_exit).await?; Ok(()) } diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs index 57e2b43ce0..9b590c26f8 100644 --- a/apps/fortuna/src/metrics.rs +++ b/apps/fortuna/src/metrics.rs @@ -12,11 +12,6 @@ use { tokio::sync::RwLock, }; -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct RequestLabel { - pub value: String, -} - #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] pub struct AccountLabel { pub chain_id: String, @@ -26,8 +21,6 @@ pub struct AccountLabel { pub struct Metrics { pub registry: RwLock, - pub http_requests: Family, - pub current_sequence_number: Family, pub end_sequence_number: Family, pub balance: Family>, @@ -42,13 +35,6 @@ impl Metrics { pub fn new() -> Self { let mut metrics_registry = Registry::default(); - let http_requests = Family::::default(); - metrics_registry.register( - "http_requests", - "Number of HTTP requests received", - http_requests.clone(), - ); - let current_sequence_number = Family::::default(); metrics_registry.register( "current_sequence_number", @@ -99,7 +85,6 @@ impl Metrics { Metrics { registry: RwLock::new(metrics_registry), - http_requests, current_sequence_number, end_sequence_number, requests, From 6663a651e969e05fc0e613589edc97fb96ed0248 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Mon, 13 May 2024 11:52:05 +0530 Subject: [PATCH 19/27] isolate keeper metrics --- apps/fortuna/src/command/run.rs | 170 +------------------- apps/fortuna/src/keeper.rs | 265 +++++++++++++++++++++++++++++--- 2 files changed, 247 insertions(+), 188 deletions(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 6298e7b313..701428e17f 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -13,11 +13,7 @@ use { ProviderConfig, RunOptions, }, - keeper, - metrics::{ - self, - AccountLabel, - }, + keeper::{self,}, state::{ HashChainState, PebbleHashChain, @@ -29,24 +25,11 @@ use { Result, }, axum::Router, - ethers::{ - middleware::Middleware, - providers::{ - Http, - Provider, - }, - signers::{ - LocalWallet, - Signer, - }, - types::Address, - }, prometheus_client::registry::Registry, std::{ collections::HashMap, net::SocketAddr, sync::Arc, - time::Duration, }, tokio::{ spawn, @@ -54,15 +37,12 @@ use { watch, RwLock, }, - time, }, tower_http::cors::CorsLayer, utoipa::OpenApi, utoipa_swagger_ui::SwaggerUi, }; -const TRACK_INTERVAL: Duration = Duration::from_secs(10); - pub async fn run_api( socket_addr: SocketAddr, chains: HashMap, @@ -122,7 +102,7 @@ pub async fn run_keeper( chains: HashMap, config: Config, private_key: String, - metrics_registry: Arc, + metrics_registry: Arc>, ) -> Result<()> { let mut handles = Vec::new(); for (chain_id, chain_config) in chains { @@ -241,160 +221,18 @@ pub async fn run(opts: &RunOptions) -> Result<()> { Ok::<(), Error>(()) }); - let registry = Arc::new(RwLock::new(Registry::default())); - - - let metrics_registry = Arc::new(metrics::Metrics::new()); + let metrics_registry = Arc::new(RwLock::new(Registry::default())); if let Some(keeper_private_key) = opts.load_keeper_private_key()? { - let keeper_address = keeper_private_key.parse::()?.address(); - spawn(run_keeper( chains.clone(), config.clone(), keeper_private_key, metrics_registry.clone(), )); - - spawn(track_balance( - config.clone(), - keeper_address, - metrics_registry.clone(), - )); } - spawn(track_hashchain( - config.clone(), - opts.provider.clone(), - metrics_registry.clone(), - )); - spawn(track_collected_fee( - config.clone(), - opts.provider.clone(), - metrics_registry.clone(), - )); - run_api(opts.addr.clone(), chains, registry.clone(), rx_exit).await?; + run_api(opts.addr.clone(), chains, metrics_registry.clone(), rx_exit).await?; Ok(()) } - -/// tracks the balance of the given address for each chain in the given config periodically -pub async fn track_balance( - config: Config, - address: Address, - metrics_registry: Arc, -) { - loop { - for (chain_id, chain_config) in &config.chains { - let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { - Ok(r) => r, - Err(_e) => continue, - }; - - let balance = match provider.get_balance(address, None).await { - // This conversion to u128 is fine as the total balance will never cross the limits - // of u128 practically. - Ok(r) => r.as_u128(), - Err(_e) => continue, - }; - // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus. - // The balance is in wei, so we need to divide by 1e18 to convert it to eth. - let balance = balance as f64 / 1e18; - - metrics_registry - .balance - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: address.to_string(), - }) - .set(balance); - } - - time::sleep(TRACK_INTERVAL).await; - } -} - -/// tracks the collected fees of the given address for each chain in the given config periodically -pub async fn track_collected_fee( - config: Config, - provider_address: Address, - metrics_registry: Arc, -) { - loop { - for (chain_id, chain_config) in &config.chains { - let contract = match PythContract::from_config(chain_config) { - Ok(r) => r, - Err(_e) => continue, - }; - - let provider_info = match contract.get_provider_info(provider_address).call().await { - Ok(info) => info, - Err(_e) => { - time::sleep(Duration::from_secs(5)).await; - continue; - } - }; - - // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. - // The fee is in wei, so we need to divide by 1e18 to convert it to eth. - let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18; - - metrics_registry - .collected_fee - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - .set(collected_fee); - } - - time::sleep(TRACK_INTERVAL).await; - } -} - -/// tracks the current sequence number and end sequence number of the given provider address for -/// each chain in the given config periodically -pub async fn track_hashchain( - config: Config, - provider_address: Address, - metrics_registry: Arc, -) { - loop { - for (chain_id, chain_config) in &config.chains { - let contract = match PythContract::from_config(chain_config) { - Ok(r) => r, - Err(_e) => continue, - }; - - let provider_info = match contract.get_provider_info(provider_address).call().await { - Ok(info) => info, - Err(_e) => { - time::sleep(Duration::from_secs(5)).await; - continue; - } - }; - let current_sequence_number = provider_info.sequence_number; - let end_sequence_number = provider_info.end_sequence_number; - - metrics_registry - .current_sequence_number - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - // sequence_number type on chain is u64 but practically it will take - // a long time for it to cross the limits of i64. - // currently prometheus only supports i64 for Gauge types - .set(current_sequence_number as i64); - metrics_registry - .end_sequence_number - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - .set(end_sequence_number as i64); - } - - time::sleep(TRACK_INTERVAL).await; - } -} diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index d0855afebf..de64334d5e 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -5,17 +5,16 @@ use { BlockchainState, }, chain::{ - ethereum::SignablePythContract, + ethereum::{ + PythContract, + SignablePythContract, + }, reader::{ BlockNumber, RequestedWithCallbackEvent, }, }, config::EthereumConfig, - metrics::{ - AccountLabel, - Metrics, - }, }, anyhow::{ anyhow, @@ -24,18 +23,37 @@ use { ethers::{ contract::ContractError, providers::{ + Http, Middleware, Provider, Ws, }, signers::Signer, - types::U256, + types::{ + Address, + U256, + }, }, futures::StreamExt, - std::sync::Arc, + prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + gauge::Gauge, + }, + registry::Registry, + }, + std::sync::{ + atomic::AtomicU64, + Arc, + }, tokio::{ spawn, - sync::mpsc, + sync::{ + mpsc, + RwLock, + }, time::{ self, Duration, @@ -47,12 +65,6 @@ use { }, }; -#[derive(Debug)] -pub struct BlockRange { - pub from: BlockNumber, - pub to: BlockNumber, -} - /// How much to wait before retrying in case of an RPC error const RETRY_INTERVAL: Duration = Duration::from_secs(5); /// How many blocks to look back for events that might be missed when starting the keeper @@ -61,7 +73,96 @@ const BACKLOG_RANGE: u64 = 1000; const BLOCK_BATCH_SIZE: u64 = 100; /// How much to wait before polling the next latest block const POLL_INTERVAL: Duration = Duration::from_secs(2); +/// Track metrics in this interval +const TRACK_INTERVAL: Duration = Duration::from_secs(10); + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct AccountLabel { + pub chain_id: String, + pub address: String, +} +pub struct KeeperMetrics { + pub current_sequence_number: Family, + pub end_sequence_number: Family, + pub balance: Family>, + pub collected_fee: Family>, + pub total_gas_spent: Family>, + pub requests: Family, + pub requests_processed: Family, + pub reveals: Family, +} + +impl KeeperMetrics { + pub async fn new(registry: Arc>) -> Self { + let mut writable_registry = registry.write().await; + + let current_sequence_number = Family::::default(); + writable_registry.register( + "current_sequence_number", + "The sequence number for a new request.", + current_sequence_number.clone(), + ); + + let end_sequence_number = Family::::default(); + writable_registry.register( + "end_sequence_number", + "The sequence number for the end request.", + end_sequence_number.clone(), + ); + + let requests = Family::::default(); + writable_registry.register( + "requests", + "Number of requests received through events", + requests.clone(), + ); + + let requests_processed = Family::::default(); + writable_registry.register( + "requests_processed", + "Number of requests processed", + requests_processed.clone(), + ); + + let reveals = Family::::default(); + writable_registry.register("reveal", "Number of reveals", reveals.clone()); + + let balance = Family::>::default(); + writable_registry.register("balance", "Balance of the keeper", balance.clone()); + + let collected_fee = Family::>::default(); + writable_registry.register( + "collected_fee", + "Collected fee on the contract", + collected_fee.clone(), + ); + + let total_gas_spent = Family::>::default(); + writable_registry.register( + "total_gas_spent", + "Total gas spent revealing requests", + total_gas_spent.clone(), + ); + + KeeperMetrics { + current_sequence_number, + end_sequence_number, + requests, + requests_processed, + reveals, + balance, + collected_fee, + total_gas_spent, + } + } +} + +#[derive(Debug)] +pub struct BlockRange { + pub from: BlockNumber, + pub to: BlockNumber, +} /// Get the latest safe block number for the chain. Retry internally if there is an error. async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber { @@ -93,8 +194,11 @@ pub async fn run_keeper_threads( private_key: String, chain_eth_config: EthereumConfig, chain_state: BlockchainState, - metrics: Arc, + metrics: Arc>, ) { + // Register metrics + let keeper_metrics = Arc::new(KeeperMetrics::new(metrics.clone()).await); + tracing::info!("starting keeper"); let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; tracing::info!("latest safe block: {}", &latest_safe_block); @@ -104,6 +208,7 @@ pub async fn run_keeper_threads( .await .expect("Chain config should be valid"), ); + let keeper_address = contract.client().inner().inner().signer().address(); // Spawn a thread to handle the events from last BACKLOG_RANGE blocks. spawn( @@ -115,7 +220,7 @@ pub async fn run_keeper_threads( contract.clone(), chain_eth_config.gas_limit, chain_state.clone(), - metrics.clone(), + keeper_metrics.clone(), ) .in_current_span(), ); @@ -138,7 +243,29 @@ pub async fn run_keeper_threads( rx, Arc::clone(&contract), chain_eth_config.gas_limit, - metrics.clone(), + keeper_metrics.clone(), + ) + .in_current_span(), + ); + + // spawn a thread to track keeper balance + spawn( + track_balance( + chain_state.id.clone(), + chain_eth_config.clone(), + keeper_address.clone(), + keeper_metrics.clone(), + ) + .in_current_span(), + ); + + // spawn a thread to track provider info + spawn( + track_provider( + chain_state.id.clone(), + chain_eth_config.clone(), + chain_state.provider_address.clone(), + keeper_metrics.clone(), ) .in_current_span(), ); @@ -154,7 +281,7 @@ pub async fn process_event( chain_config: &BlockchainState, contract: &Arc, gas_limit: U256, - metrics: Arc, + metrics: Arc, ) -> Result<()> { if chain_config.provider_address != event.provider_address { return Ok(()); @@ -314,7 +441,7 @@ pub async fn process_block_range( contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, - metrics: Arc, + metrics: Arc, ) { let BlockRange { from: first_block, @@ -352,7 +479,7 @@ pub async fn process_single_block_batch( contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, - metrics: Arc, + metrics: Arc, ) { loop { let events_res = chain_state @@ -525,7 +652,7 @@ pub async fn process_new_blocks( mut rx: mpsc::Receiver, contract: Arc, gas_limit: U256, - metrics: Arc, + metrics: Arc, ) { tracing::info!("Waiting for new block ranges to process"); loop { @@ -550,7 +677,7 @@ pub async fn process_backlog( contract: Arc, gas_limit: U256, chain_state: BlockchainState, - metrics: Arc, + metrics: Arc, ) { tracing::info!("Processing backlog"); process_block_range(backlog_range, contract, gas_limit, chain_state, metrics) @@ -558,3 +685,97 @@ pub async fn process_backlog( .await; tracing::info!("Backlog processed"); } + + +/// tracks the balance of the given address for each chain in the given config periodically +pub async fn track_balance( + chain_id: String, + chain_config: EthereumConfig, + address: Address, + metrics_registry: Arc, +) { + loop { + let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { + Ok(r) => r, + Err(_e) => continue, + }; + + let balance = match provider.get_balance(address, None).await { + // This conversion to u128 is fine as the total balance will never cross the limits + // of u128 practically. + Ok(r) => r.as_u128(), + Err(_e) => continue, + }; + // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus. + // The balance is in wei, so we need to divide by 1e18 to convert it to eth. + let balance = balance as f64 / 1e18; + + metrics_registry + .balance + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: address.to_string(), + }) + .set(balance); + + time::sleep(TRACK_INTERVAL).await; + } +} + +/// tracks the collected fees and the hashchain data of the given provider address for each chain in the given config periodically +pub async fn track_provider( + chain_id: String, + chain_config: EthereumConfig, + provider_address: Address, + metrics_registry: Arc, +) { + loop { + let contract = match PythContract::from_config(&chain_config) { + Ok(r) => r, + Err(_e) => continue, + }; + + let provider_info = match contract.get_provider_info(provider_address).call().await { + Ok(info) => info, + Err(_e) => { + time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + + // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. + // The fee is in wei, so we need to divide by 1e18 to convert it to eth. + let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18; + + let current_sequence_number = provider_info.sequence_number; + let end_sequence_number = provider_info.end_sequence_number; + + metrics_registry + .collected_fee + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + .set(collected_fee); + + metrics_registry + .current_sequence_number + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + // sequence_number type on chain is u64 but practically it will take + // a long time for it to cross the limits of i64. + // currently prometheus only supports i64 for Gauge types + .set(current_sequence_number as i64); + metrics_registry + .end_sequence_number + .get_or_create(&AccountLabel { + chain_id: chain_id.clone(), + address: provider_address.to_string(), + }) + .set(end_sequence_number as i64); + + time::sleep(TRACK_INTERVAL).await; + } +} From d8e7cdee0491f600b2db23ba6330f94f8dd694e3 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Mon, 13 May 2024 13:44:14 +0530 Subject: [PATCH 20/27] module removed --- apps/fortuna/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/fortuna/src/main.rs b/apps/fortuna/src/main.rs index b48aea36b5..4cd2475043 100644 --- a/apps/fortuna/src/main.rs +++ b/apps/fortuna/src/main.rs @@ -12,7 +12,6 @@ pub mod chain; pub mod command; pub mod config; pub mod keeper; -pub mod metrics; pub mod state; // Server TODO list: From 19d04d1e88f91bde7915bb5a66d59acae7852f18 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Mon, 13 May 2024 13:44:58 +0530 Subject: [PATCH 21/27] remove metrics file --- apps/fortuna/src/metrics.rs | 98 ------------------------------------- 1 file changed, 98 deletions(-) delete mode 100644 apps/fortuna/src/metrics.rs diff --git a/apps/fortuna/src/metrics.rs b/apps/fortuna/src/metrics.rs deleted file mode 100644 index 9b590c26f8..0000000000 --- a/apps/fortuna/src/metrics.rs +++ /dev/null @@ -1,98 +0,0 @@ -use { - prometheus_client::{ - encoding::EncodeLabelSet, - metrics::{ - counter::Counter, - family::Family, - gauge::Gauge, - }, - registry::Registry, - }, - std::sync::atomic::AtomicU64, - tokio::sync::RwLock, -}; - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct AccountLabel { - pub chain_id: String, - pub address: String, -} - -pub struct Metrics { - pub registry: RwLock, - - pub current_sequence_number: Family, - pub end_sequence_number: Family, - pub balance: Family>, - pub collected_fee: Family>, - pub total_gas_spent: Family>, - pub requests: Family, - pub requests_processed: Family, - pub reveals: Family, -} - -impl Metrics { - pub fn new() -> Self { - let mut metrics_registry = Registry::default(); - - let current_sequence_number = Family::::default(); - metrics_registry.register( - "current_sequence_number", - "The sequence number for a new request.", - current_sequence_number.clone(), - ); - - let end_sequence_number = Family::::default(); - metrics_registry.register( - "end_sequence_number", - "The sequence number for the end request.", - end_sequence_number.clone(), - ); - - let requests = Family::::default(); - metrics_registry.register( - "requests", - "Number of requests received through events", - requests.clone(), - ); - - let requests_processed = Family::::default(); - metrics_registry.register( - "requests_processed", - "Number of requests processed", - requests_processed.clone(), - ); - - let reveals = Family::::default(); - metrics_registry.register("reveal", "Number of reveals", reveals.clone()); - - let balance = Family::>::default(); - metrics_registry.register("balance", "Balance of the keeper", balance.clone()); - - let collected_fee = Family::>::default(); - metrics_registry.register( - "collected_fee", - "Collected fee on the contract", - collected_fee.clone(), - ); - - let total_gas_spent = Family::>::default(); - metrics_registry.register( - "total_gas_spent", - "Total gas spent revealing requests", - total_gas_spent.clone(), - ); - - Metrics { - registry: RwLock::new(metrics_registry), - current_sequence_number, - end_sequence_number, - requests, - requests_processed, - reveals, - balance, - collected_fee, - total_gas_spent, - } - } -} From d5923702784c670eaa1cc7314469279b29475f13 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Mon, 13 May 2024 14:33:21 +0530 Subject: [PATCH 22/27] move registry out of metrics --- apps/fortuna/src/api.rs | 11 ++++++----- apps/fortuna/src/api/metrics.rs | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/apps/fortuna/src/api.rs b/apps/fortuna/src/api.rs index d7a419677c..3fb1979682 100644 --- a/apps/fortuna/src/api.rs +++ b/apps/fortuna/src/api.rs @@ -58,14 +58,15 @@ pub struct RequestLabel { } pub struct ApiMetrics { - pub metrics_registry: Arc>, - pub http_requests: Family, + pub http_requests: Family, } #[derive(Clone)] pub struct ApiState { pub chains: Arc>, + pub metrics_registry: Arc>, + /// Prometheus metrics pub metrics: Arc, } @@ -77,19 +78,19 @@ impl ApiState { ) -> ApiState { let metrics = ApiMetrics { http_requests: Family::default(), - metrics_registry, }; let http_requests = metrics.http_requests.clone(); - metrics.metrics_registry.write().await.register( + metrics_registry.write().await.register( "http_requests", "Number of HTTP requests received", http_requests, ); ApiState { - chains: Arc::new(chains), + chains: Arc::new(chains), metrics: Arc::new(metrics), + metrics_registry, } } } diff --git a/apps/fortuna/src/api/metrics.rs b/apps/fortuna/src/api/metrics.rs index 3f7c1f4016..8e162f0523 100644 --- a/apps/fortuna/src/api/metrics.rs +++ b/apps/fortuna/src/api/metrics.rs @@ -9,7 +9,7 @@ use { }; pub async fn metrics(State(state): State) -> impl IntoResponse { - let registry = state.metrics.metrics_registry.read().await; + let registry = state.metrics_registry.read().await; let mut buffer = String::new(); // Should not fail if the metrics are valid and there is memory available From aad5fd8848a8bb4df4abc09ba1fadb98596f05c4 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Mon, 13 May 2024 14:34:42 +0530 Subject: [PATCH 23/27] simplify improt --- apps/fortuna/src/command/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 701428e17f..acf12785bb 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -13,7 +13,7 @@ use { ProviderConfig, RunOptions, }, - keeper::{self,}, + keeper, state::{ HashChainState, PebbleHashChain, From 91af7a086d1a8913f1ba5d766b1aec67d74564de Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Mon, 13 May 2024 14:40:14 +0530 Subject: [PATCH 24/27] add retry lag --- apps/fortuna/src/keeper.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index de64334d5e..76a39801bf 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -687,7 +687,7 @@ pub async fn process_backlog( } -/// tracks the balance of the given address for each chain in the given config periodically +/// tracks the balance of the given address on the given chain periodically pub async fn track_balance( chain_id: String, chain_config: EthereumConfig, @@ -697,14 +697,20 @@ pub async fn track_balance( loop { let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { Ok(r) => r, - Err(_e) => continue, + Err(_e) => { + time::sleep(RETRY_INTERVAL).await; + continue; + } }; let balance = match provider.get_balance(address, None).await { // This conversion to u128 is fine as the total balance will never cross the limits // of u128 practically. Ok(r) => r.as_u128(), - Err(_e) => continue, + Err(_e) => { + time::sleep(RETRY_INTERVAL).await; + continue; + } }; // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus. // The balance is in wei, so we need to divide by 1e18 to convert it to eth. @@ -722,7 +728,7 @@ pub async fn track_balance( } } -/// tracks the collected fees and the hashchain data of the given provider address for each chain in the given config periodically +/// tracks the collected fees and the hashchain data of the given provider address on the given chain periodically pub async fn track_provider( chain_id: String, chain_config: EthereumConfig, @@ -732,19 +738,22 @@ pub async fn track_provider( loop { let contract = match PythContract::from_config(&chain_config) { Ok(r) => r, - Err(_e) => continue, + Err(_e) => { + time::sleep(RETRY_INTERVAL).await; + continue; + } }; let provider_info = match contract.get_provider_info(provider_address).call().await { Ok(info) => info, Err(_e) => { - time::sleep(Duration::from_secs(5)).await; + time::sleep(RETRY_INTERVAL).await; continue; } }; // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. - // The fee is in wei, so we need to divide by 1e18 to convert it to eth. + // The fee is in wei, so we divide by 1e18 to convert it to eth. let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18; let current_sequence_number = provider_info.sequence_number; From 4617ff72265486b05444cf6751ece1f9a150e232 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Mon, 13 May 2024 14:41:38 +0530 Subject: [PATCH 25/27] update version --- apps/fortuna/Cargo.lock | 2 +- apps/fortuna/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index c0b371c45b..50bb566de2 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -1488,7 +1488,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "5.2.4" +version = "5.3.0" dependencies = [ "anyhow", "axum", diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index b749d423ce..c790bc8ba1 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "5.2.4" +version = "5.3.0" edition = "2021" [dependencies] From dba79cbec18954adfbba2aacc302df1d2fb1acb5 Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Mon, 13 May 2024 16:07:21 +0530 Subject: [PATCH 26/27] add block timestamp lag --- apps/fortuna/src/keeper.rs | 79 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 76a39801bf..53b045c26b 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -31,6 +31,7 @@ use { signers::Signer, types::{ Address, + BlockNumber as EthereumBlockNumber, U256, }, }, @@ -44,9 +45,15 @@ use { }, registry::Registry, }, - std::sync::{ - atomic::AtomicU64, - Arc, + std::{ + sync::{ + atomic::AtomicU64, + Arc, + }, + time::{ + SystemTime, + UNIX_EPOCH, + }, }, tokio::{ spawn, @@ -82,6 +89,11 @@ pub struct AccountLabel { pub address: String, } +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct ChainLabel { + pub chain_id: String, +} + pub struct KeeperMetrics { pub current_sequence_number: Family, pub end_sequence_number: Family, @@ -91,6 +103,7 @@ pub struct KeeperMetrics { pub requests: Family, pub requests_processed: Family, pub reveals: Family, + pub block_timestamp_lag: Family, } impl KeeperMetrics { @@ -145,6 +158,13 @@ impl KeeperMetrics { total_gas_spent.clone(), ); + let block_timestamp_lag = Family::::default(); + writable_registry.register( + "block_timestamp_lag", + "The difference between server timestamp and latest block timestamp", + block_timestamp_lag.clone(), + ); + KeeperMetrics { current_sequence_number, end_sequence_number, @@ -154,6 +174,7 @@ impl KeeperMetrics { balance, collected_fee, total_gas_spent, + block_timestamp_lag, } } } @@ -269,6 +290,16 @@ pub async fn run_keeper_threads( ) .in_current_span(), ); + + // spawn a thread to track latest block lag + spawn( + track_block_timestamp_lag( + chain_state.id.clone(), + chain_eth_config.clone(), + keeper_metrics.clone(), + ) + .in_current_span(), + ); } @@ -788,3 +819,45 @@ pub async fn track_provider( time::sleep(TRACK_INTERVAL).await; } } + +pub async fn track_block_timestamp_lag( + chain_id: String, + chain_config: EthereumConfig, + metrics_registry: Arc, +) { + loop { + let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { + Ok(r) => r, + Err(_e) => { + time::sleep(RETRY_INTERVAL).await; + continue; + } + }; + + match provider.get_block(EthereumBlockNumber::Latest).await { + Ok(b) => { + if let Some(block) = b { + let block_timestamp = block.timestamp; + let server_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let lag = server_timestamp - block_timestamp.as_u64(); + + metrics_registry + .block_timestamp_lag + .get_or_create(&ChainLabel { + chain_id: chain_id.clone(), + }) + .set(lag as i64); + } + } + Err(_e) => { + time::sleep(RETRY_INTERVAL).await; + continue; + } + }; + + time::sleep(TRACK_INTERVAL).await; + } +} From ef5924b2015ce44f10bf131c5fbf02665b8623fa Mon Sep 17 00:00:00 2001 From: 0xfirefist Date: Mon, 13 May 2024 17:40:53 +0530 Subject: [PATCH 27/27] traced client implementation --- apps/fortuna/src/chain.rs | 1 + apps/fortuna/src/chain/ethereum.rs | 27 +++- apps/fortuna/src/chain/traced_client.rs | 133 ++++++++++++++++++ apps/fortuna/src/command.rs | 12 +- apps/fortuna/src/command/register_provider.rs | 13 +- apps/fortuna/src/command/run.rs | 9 +- apps/fortuna/src/command/setup_provider.rs | 21 ++- apps/fortuna/src/config.rs | 13 +- apps/fortuna/src/keeper.rs | 83 ++++++----- apps/fortuna/src/main.rs | 6 +- 10 files changed, 247 insertions(+), 71 deletions(-) create mode 100644 apps/fortuna/src/chain/traced_client.rs diff --git a/apps/fortuna/src/chain.rs b/apps/fortuna/src/chain.rs index 21680a6a0b..bc4d9370d8 100644 --- a/apps/fortuna/src/chain.rs +++ b/apps/fortuna/src/chain.rs @@ -1,2 +1,3 @@ pub(crate) mod ethereum; pub(crate) mod reader; +pub(crate) mod traced_client; diff --git a/apps/fortuna/src/chain/ethereum.rs b/apps/fortuna/src/chain/ethereum.rs index c890823f56..2903951b2c 100644 --- a/apps/fortuna/src/chain/ethereum.rs +++ b/apps/fortuna/src/chain/ethereum.rs @@ -1,5 +1,7 @@ use { + super::traced_client::TracedClient, crate::{ + api::ChainId, chain::reader::{ self, BlockNumber, @@ -34,7 +36,6 @@ use { }, prelude::TransactionRequest, providers::{ - Http, Middleware, Provider, }, @@ -48,11 +49,13 @@ use { U256, }, }, + prometheus_client::registry::Registry, sha3::{ Digest, Keccak256, }, std::sync::Arc, + tokio::sync::RwLock, }; // TODO: Programmatically generate this so we don't have to keep committed ABI in sync with the @@ -64,11 +67,11 @@ abigen!( pub type SignablePythContract = PythRandom< TransformerMiddleware< - NonceManagerMiddleware, LocalWallet>>, + NonceManagerMiddleware, LocalWallet>>, LegacyTxTransformer, >, >; -pub type PythContract = PythRandom>; +pub type PythContract = PythRandom>; /// Transformer that converts a transaction into a legacy transaction if use_legacy_tx is true. #[derive(Clone, Debug)] @@ -90,10 +93,14 @@ impl Transformer for LegacyTxTransformer { impl SignablePythContract { pub async fn from_config( + chain_id: ChainId, chain_config: &EthereumConfig, private_key: &str, + metrics_registry: Arc>, ) -> Result { - let provider = Provider::::try_from(&chain_config.geth_rpc_addr)?; + let provider = + TracedClient::new_provider(chain_id, &chain_config.geth_rpc_addr, metrics_registry) + .await?; let chain_id = provider.get_chainid().await?; let transformer = LegacyTxTransformer { @@ -184,8 +191,14 @@ impl SignablePythContract { } impl PythContract { - pub fn from_config(chain_config: &EthereumConfig) -> Result { - let provider = Provider::::try_from(&chain_config.geth_rpc_addr)?; + pub async fn from_config( + chain_id: ChainId, + chain_config: &EthereumConfig, + metrics_registry: Arc>, + ) -> Result { + let provider = + TracedClient::new_provider(chain_id, &chain_config.geth_rpc_addr, metrics_registry) + .await?; Ok(PythRandom::new( chain_config.contract_addr, @@ -262,7 +275,7 @@ impl EntropyReader for PythContract { user_random_number: [u8; 32], provider_revelation: [u8; 32], ) -> Result> { - let result: Result>> = self + let result: Result>> = self .reveal_with_callback( provider, sequence_number, diff --git a/apps/fortuna/src/chain/traced_client.rs b/apps/fortuna/src/chain/traced_client.rs new file mode 100644 index 0000000000..fa2cf93b58 --- /dev/null +++ b/apps/fortuna/src/chain/traced_client.rs @@ -0,0 +1,133 @@ +use { + crate::api::ChainId, + anyhow::Result, + axum::async_trait, + ethers::{ + prelude::Http, + providers::{ + HttpClientError, + JsonRpcClient, + Provider, + }, + }, + prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + histogram::Histogram, + }, + registry::Registry, + }, + std::sync::Arc, + tokio::{ + sync::RwLock, + time::Instant, + }, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)] +pub struct ChainLabel { + chain_id: ChainId, +} + +#[derive(Debug)] +pub struct TracedClient { + inner: Http, + + chain_id: ChainId, + rpc_requests_count: Family, + rpc_requests_latency: Family, + rpc_requests_errors_count: Family, +} + +#[async_trait] +impl JsonRpcClient for TracedClient { + type Error = HttpClientError; + + async fn request< + T: serde::Serialize + Send + Sync + std::fmt::Debug, + R: serde::de::DeserializeOwned + Send, + >( + &self, + method: &str, + params: T, + ) -> Result { + let start = Instant::now(); + self.rpc_requests_count + .get_or_create(&ChainLabel { + chain_id: self.chain_id.clone(), + }) + .inc(); + let res = match self.inner.request(method, params).await { + Ok(result) => Ok(result), + Err(e) => { + self.rpc_requests_errors_count + .get_or_create(&ChainLabel { + chain_id: self.chain_id.clone(), + }) + .inc(); + Err(e) + } + }; + + let latency = start.elapsed().as_secs_f64(); + println!( + "RPC request to {:?} took {:.2} seconds", + self.chain_id, latency + ); + self.rpc_requests_latency + .get_or_create(&ChainLabel { + chain_id: self.chain_id.clone(), + }) + .observe(latency); + res + } +} + +impl TracedClient { + pub async fn new_provider( + chain_id: ChainId, + url: &str, + metrics_registry: Arc>, + ) -> Result> { + let mut writable_registry = metrics_registry.write().await; + + let rpc_requests_count = Family::default(); + writable_registry.register( + "rpc_requests_count", + "The number of RPC requests made to the chain.", + rpc_requests_count.clone(), + ); + + let rpc_requests_latency = Family::::new_with_constructor(|| { + Histogram::new( + [ + 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ) + }); + writable_registry.register( + "rpc_requests_latency", + "The latency of RPC requests to the chain.", + rpc_requests_latency.clone(), + ); + + let rpc_requests_errors_count = Family::default(); + writable_registry.register( + "rpc_requests_errors_count", + "The number of RPC requests made to the chain that failed.", + rpc_requests_errors_count.clone(), + ); + + let url = url::Url::parse(url)?; + Ok(Provider::new(TracedClient { + inner: Http::new(url), + chain_id, + rpc_requests_count, + rpc_requests_latency, + rpc_requests_errors_count, + })) + } +} diff --git a/apps/fortuna/src/command.rs b/apps/fortuna/src/command.rs index a5170a8a52..de4d8ec22f 100644 --- a/apps/fortuna/src/command.rs +++ b/apps/fortuna/src/command.rs @@ -1,15 +1,15 @@ -mod generate; -mod get_request; +// mod generate; +// mod get_request; mod register_provider; -mod request_randomness; +// mod request_randomness; mod run; mod setup_provider; pub use { - generate::generate, - get_request::get_request, + // generate::generate, + // get_request::get_request, register_provider::register_provider, - request_randomness::request_randomness, + // request_randomness::request_randomness, run::run, setup_provider::setup_provider, }; diff --git a/apps/fortuna/src/command/register_provider.rs b/apps/fortuna/src/command/register_provider.rs index a49ac64c56..dd0f4b3e79 100644 --- a/apps/fortuna/src/command/register_provider.rs +++ b/apps/fortuna/src/command/register_provider.rs @@ -16,7 +16,9 @@ use { }, types::U256, }, + prometheus_client::registry::Registry, std::sync::Arc, + tokio::sync::RwLock, }; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] @@ -31,8 +33,15 @@ pub async fn register_provider(opts: &RegisterProviderOptions) -> Result<()> { let chain_config = Config::load(&opts.config.config)?.get_chain_config(&opts.chain_id)?; // Initialize a Provider to interface with the EVM contract. - let contract = - Arc::new(SignablePythContract::from_config(&chain_config, &opts.private_key).await?); + let contract = Arc::new( + SignablePythContract::from_config( + opts.chain_id.clone(), + &chain_config, + &opts.private_key, + Arc::new(RwLock::new(Registry::default())), + ) + .await?, + ); // Create a new random hash chain. let random = rand::random::<[u8; 32]>(); let secret = opts.randomness.load_secret()?; diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index acf12785bb..060fb37bc6 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -132,10 +132,15 @@ pub async fn run(opts: &RunOptions) -> Result<()> { .map(|path| ProviderConfig::load(&path).expect("Failed to load provider config")); let secret = opts.randomness.load_secret()?; let (tx_exit, rx_exit) = watch::channel(false); + let metrics_registry = Arc::new(RwLock::new(Registry::default())); let mut chains: HashMap = HashMap::new(); for (chain_id, chain_config) in &config.chains { - let contract = Arc::new(PythContract::from_config(&chain_config)?); + let contract = Arc::new( + PythContract::from_config(chain_id.clone(), &chain_config, metrics_registry.clone()) + .await + .unwrap(), + ); let provider_chain_config = provider_config .as_ref() .and_then(|c| c.get_chain_config(chain_id)); @@ -221,8 +226,6 @@ pub async fn run(opts: &RunOptions) -> Result<()> { Ok::<(), Error>(()) }); - let metrics_registry = Arc::new(RwLock::new(Registry::default())); - if let Some(keeper_private_key) = opts.load_keeper_private_key()? { spawn(run_keeper( chains.clone(), diff --git a/apps/fortuna/src/command/setup_provider.rs b/apps/fortuna/src/command/setup_provider.rs index 71d0a35f18..93b2bb7f80 100644 --- a/apps/fortuna/src/command/setup_provider.rs +++ b/apps/fortuna/src/command/setup_provider.rs @@ -2,9 +2,9 @@ use { crate::{ api::get_register_uri, chain::ethereum::SignablePythContract, - command::{ - register_provider, - register_provider::CommitmentMetadata, + command::register_provider::{ + self, + CommitmentMetadata, }, config::{ Config, @@ -28,7 +28,9 @@ use { }, types::Bytes, }, + prometheus_client::registry::Registry, std::sync::Arc, + tokio::sync::RwLock, }; /// Setup provider for all the chains. @@ -45,8 +47,15 @@ pub async fn setup_provider(opts: &SetupProviderOptions) -> Result<()> { for (chain_id, chain_config) in &config.chains { // Initialize a Provider to interface with the EVM contract. - let contract = - Arc::new(SignablePythContract::from_config(&chain_config, &private_key).await?); + let contract = Arc::new( + SignablePythContract::from_config( + chain_id.clone(), + &chain_config, + &private_key, + Arc::new(RwLock::new(Registry::default())), + ) + .await?, + ); tracing::info!("{}: fetching provider info", chain_id); let provider_info = contract.get_provider_info(provider_address).call().await?; @@ -107,7 +116,7 @@ pub async fn setup_provider(opts: &SetupProviderOptions) -> Result<()> { if register { tracing::info!("{}: registering", &chain_id); - register_provider(&RegisterProviderOptions { + register_provider::register_provider(&RegisterProviderOptions { config: opts.config.clone(), chain_id: chain_id.clone(), private_key: private_key.clone(), diff --git a/apps/fortuna/src/config.rs b/apps/fortuna/src/config.rs index ead14c72d0..63b81b2b61 100644 --- a/apps/fortuna/src/config.rs +++ b/apps/fortuna/src/config.rs @@ -62,15 +62,14 @@ pub enum Options { /// Set up the provider for all the provided chains. /// It registers, re-registers, or updates provider config on chain. SetupProvider(SetupProviderOptions), + // / Request a random number from the contract. + // RequestRandomness(RequestRandomnessOptions), - /// Request a random number from the contract. - RequestRandomness(RequestRandomnessOptions), + // / Generate a random number by running the entire protocol end-to-end + // Generate(GenerateOptions), - /// Generate a random number by running the entire protocol end-to-end - Generate(GenerateOptions), - - /// Get the status of a pending request for a random number. - GetRequest(GetRequestOptions), + // / Get the status of a pending request for a random number. + // GetRequest(GetRequestOptions), } #[derive(Args, Clone, Debug)] diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index 53b045c26b..058cf9b4db 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -3,6 +3,7 @@ use { api::{ self, BlockchainState, + ChainId, }, chain::{ ethereum::{ @@ -13,6 +14,7 @@ use { BlockNumber, RequestedWithCallbackEvent, }, + traced_client::TracedClient, }, config::EthereumConfig, }, @@ -23,7 +25,6 @@ use { ethers::{ contract::ContractError, providers::{ - Http, Middleware, Provider, Ws, @@ -95,6 +96,7 @@ pub struct ChainLabel { } pub struct KeeperMetrics { + pub metrics_registry: Arc>, pub current_sequence_number: Family, pub end_sequence_number: Family, pub balance: Family>, @@ -165,7 +167,10 @@ impl KeeperMetrics { block_timestamp_lag.clone(), ); + drop(writable_registry); + KeeperMetrics { + metrics_registry: registry, current_sequence_number, end_sequence_number, requests, @@ -225,9 +230,14 @@ pub async fn run_keeper_threads( tracing::info!("latest safe block: {}", &latest_safe_block); let contract = Arc::new( - SignablePythContract::from_config(&chain_eth_config, &private_key) - .await - .expect("Chain config should be valid"), + SignablePythContract::from_config( + chain_state.id.clone(), + &chain_eth_config, + &private_key, + metrics.clone(), + ) + .await + .expect("Chain config should be valid"), ); let keeper_address = contract.client().inner().inner().signer().address(); @@ -720,20 +730,20 @@ pub async fn process_backlog( /// tracks the balance of the given address on the given chain periodically pub async fn track_balance( - chain_id: String, + chain_id: ChainId, chain_config: EthereumConfig, address: Address, - metrics_registry: Arc, + metrics: Arc, ) { - loop { - let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { - Ok(r) => r, - Err(_e) => { - time::sleep(RETRY_INTERVAL).await; - continue; - } - }; + let provider = TracedClient::new_provider( + chain_id.clone(), + &chain_config.geth_rpc_addr, + metrics.metrics_registry.clone(), + ) + .await + .unwrap(); + loop { let balance = match provider.get_balance(address, None).await { // This conversion to u128 is fine as the total balance will never cross the limits // of u128 practically. @@ -747,7 +757,7 @@ pub async fn track_balance( // The balance is in wei, so we need to divide by 1e18 to convert it to eth. let balance = balance as f64 / 1e18; - metrics_registry + metrics .balance .get_or_create(&AccountLabel { chain_id: chain_id.clone(), @@ -764,17 +774,16 @@ pub async fn track_provider( chain_id: String, chain_config: EthereumConfig, provider_address: Address, - metrics_registry: Arc, + metrics: Arc, ) { + let contract = PythContract::from_config( + chain_id.clone(), + &chain_config, + metrics.metrics_registry.clone(), + ) + .await + .unwrap(); loop { - let contract = match PythContract::from_config(&chain_config) { - Ok(r) => r, - Err(_e) => { - time::sleep(RETRY_INTERVAL).await; - continue; - } - }; - let provider_info = match contract.get_provider_info(provider_address).call().await { Ok(info) => info, Err(_e) => { @@ -790,7 +799,7 @@ pub async fn track_provider( let current_sequence_number = provider_info.sequence_number; let end_sequence_number = provider_info.end_sequence_number; - metrics_registry + metrics .collected_fee .get_or_create(&AccountLabel { chain_id: chain_id.clone(), @@ -798,7 +807,7 @@ pub async fn track_provider( }) .set(collected_fee); - metrics_registry + metrics .current_sequence_number .get_or_create(&AccountLabel { chain_id: chain_id.clone(), @@ -808,7 +817,7 @@ pub async fn track_provider( // a long time for it to cross the limits of i64. // currently prometheus only supports i64 for Gauge types .set(current_sequence_number as i64); - metrics_registry + metrics .end_sequence_number .get_or_create(&AccountLabel { chain_id: chain_id.clone(), @@ -823,17 +832,17 @@ pub async fn track_provider( pub async fn track_block_timestamp_lag( chain_id: String, chain_config: EthereumConfig, - metrics_registry: Arc, + metrics: Arc, ) { - loop { - let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { - Ok(r) => r, - Err(_e) => { - time::sleep(RETRY_INTERVAL).await; - continue; - } - }; + let provider = TracedClient::new_provider( + chain_id.clone(), + &chain_config.geth_rpc_addr, + metrics.metrics_registry.clone(), + ) + .await + .unwrap(); + loop { match provider.get_block(EthereumBlockNumber::Latest).await { Ok(b) => { if let Some(block) = b { @@ -844,7 +853,7 @@ pub async fn track_block_timestamp_lag( .as_secs(); let lag = server_timestamp - block_timestamp.as_u64(); - metrics_registry + metrics .block_timestamp_lag .get_or_create(&ChainLabel { chain_id: chain_id.clone(), diff --git a/apps/fortuna/src/main.rs b/apps/fortuna/src/main.rs index 4cd2475043..911a860667 100644 --- a/apps/fortuna/src/main.rs +++ b/apps/fortuna/src/main.rs @@ -36,11 +36,11 @@ async fn main() -> Result<()> { )?; match config::Options::parse() { - config::Options::GetRequest(opts) => command::get_request(&opts).await, - config::Options::Generate(opts) => command::generate(&opts).await, + // config::Options::GetRequest(opts) => command::get_request(&opts).await, + // config::Options::Generate(opts) => command::generate(&opts).await, config::Options::Run(opts) => command::run(&opts).await, config::Options::RegisterProvider(opts) => command::register_provider(&opts).await, config::Options::SetupProvider(opts) => command::setup_provider(&opts).await, - config::Options::RequestRandomness(opts) => command::request_randomness(&opts).await, + // config::Options::RequestRandomness(opts) => command::request_randomness(&opts).await, } }