Skip to content

Commit

Permalink
feat(fortuna): Traced client for better observability (#1651)
Browse files Browse the repository at this point in the history
* feat(fortuna): Traced client for better observability
  • Loading branch information
m30m authored Jun 4, 2024
1 parent a0c5d11 commit c264c31
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 102 deletions.
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "6.2.2"
version = "6.2.3"
edition = "2021"

[dependencies]
Expand Down
1 change: 1 addition & 0 deletions apps/fortuna/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod eth_gas_oracle;
pub(crate) mod ethereum;
pub(crate) mod reader;
pub(crate) mod traced_client;
109 changes: 76 additions & 33 deletions apps/fortuna/src/chain/ethereum.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::{
api::ChainId,
chain::{
eth_gas_oracle::EthProviderOracle,
reader::{
Expand All @@ -9,6 +10,10 @@ use {
EntropyReader,
RequestedWithCallbackEvent,
},
traced_client::{
RpcMetrics,
TracedClient,
},
},
config::EthereumConfig,
},
Expand All @@ -22,7 +27,6 @@ use {
abi::RawLog,
contract::{
abigen,
ContractError,
EthLogDecode,
},
core::types::Address,
Expand All @@ -34,6 +38,7 @@ use {
},
prelude::{
BlockId,
JsonRpcClient,
PendingTransaction,
TransactionRequest,
},
Expand Down Expand Up @@ -67,15 +72,19 @@ abigen!(
"../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json"
);

pub type SignablePythContract = PythRandom<
pub type SignablePythContractInner<T> = PythRandom<
LegacyTxMiddleware<
GasOracleMiddleware<
NonceManagerMiddleware<SignerMiddleware<Provider<Http>, LocalWallet>>,
EthProviderOracle<Provider<Http>>,
NonceManagerMiddleware<SignerMiddleware<Provider<T>, LocalWallet>>,
EthProviderOracle<Provider<T>>,
>,
>,
>;
pub type SignablePythContract = SignablePythContractInner<Http>;
pub type InstrumentedSignablePythContract = SignablePythContractInner<TracedClient>;

pub type PythContract = PythRandom<Provider<Http>>;
pub type InstrumentedPythContract = PythRandom<Provider<TracedClient>>;

/// Middleware that converts a transaction into a legacy transaction if use_legacy_tx is true.
/// We can not use TransformerMiddleware because keeper calls fill_transaction first which bypasses
Expand Down Expand Up @@ -157,32 +166,7 @@ impl<M: Middleware> Middleware for LegacyTxMiddleware<M> {
}
}

impl SignablePythContract {
pub async fn from_config(
chain_config: &EthereumConfig,
private_key: &str,
) -> Result<SignablePythContract> {
let provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;
let chain_id = provider.get_chainid().await?;
let gas_oracle = EthProviderOracle::new(provider.clone());
let wallet__ = private_key
.parse::<LocalWallet>()?
.with_chain_id(chain_id.as_u64());

let address = wallet__.address();

Ok(PythRandom::new(
chain_config.contract_addr,
Arc::new(LegacyTxMiddleware::new(
chain_config.legacy_tx,
GasOracleMiddleware::new(
NonceManagerMiddleware::new(SignerMiddleware::new(provider, wallet__), address),
gas_oracle,
),
)),
))
}

impl<T: JsonRpcClient + 'static + Clone> SignablePythContractInner<T> {
/// Submit a request for a random number to the contract.
///
/// This method is a version of the autogenned `request` method that parses the emitted logs
Expand Down Expand Up @@ -249,10 +233,54 @@ impl SignablePythContract {
Err(anyhow!("Request failed").into())
}
}

pub async fn from_config_and_provider(
chain_config: &EthereumConfig,
private_key: &str,
provider: Provider<T>,
) -> Result<SignablePythContractInner<T>> {
let chain_id = provider.get_chainid().await?;
let gas_oracle = EthProviderOracle::new(provider.clone());
let wallet__ = private_key
.parse::<LocalWallet>()?
.with_chain_id(chain_id.as_u64());

let address = wallet__.address();

Ok(PythRandom::new(
chain_config.contract_addr,
Arc::new(LegacyTxMiddleware::new(
chain_config.legacy_tx,
GasOracleMiddleware::new(
NonceManagerMiddleware::new(SignerMiddleware::new(provider, wallet__), address),
gas_oracle,
),
)),
))
}
}

impl SignablePythContract {
pub async fn from_config(chain_config: &EthereumConfig, private_key: &str) -> Result<Self> {
let provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;
Self::from_config_and_provider(chain_config, private_key, provider).await
}
}

impl InstrumentedSignablePythContract {
pub async fn from_config(
chain_config: &EthereumConfig,
private_key: &str,
chain_id: ChainId,
metrics: Arc<RpcMetrics>,
) -> Result<Self> {
let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?;
Self::from_config_and_provider(chain_config, private_key, provider).await
}
}

impl PythContract {
pub fn from_config(chain_config: &EthereumConfig) -> Result<PythContract> {
pub fn from_config(chain_config: &EthereumConfig) -> Result<Self> {
let provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;

Ok(PythRandom::new(
Expand All @@ -262,8 +290,23 @@ impl PythContract {
}
}

impl InstrumentedPythContract {
pub fn from_config(
chain_config: &EthereumConfig,
chain_id: ChainId,
metrics: Arc<RpcMetrics>,
) -> Result<Self> {
let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?;

Ok(PythRandom::new(
chain_config.contract_addr,
Arc::new(provider),
))
}
}

#[async_trait]
impl EntropyReader for PythContract {
impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
async fn get_request(
&self,
provider_address: Address,
Expand Down Expand Up @@ -330,7 +373,7 @@ impl EntropyReader for PythContract {
user_random_number: [u8; 32],
provider_revelation: [u8; 32],
) -> Result<U256> {
let result: Result<U256, ContractError<Provider<Http>>> = self
let result = self
.reveal_with_callback(
provider,
sequence_number,
Expand Down
137 changes: 137 additions & 0 deletions apps/fortuna/src/chain/traced_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use {
crate::api::ChainId,
anyhow::Result,
axum::async_trait,
ethers::{
prelude::Http,
providers::{
HttpClientError,
JsonRpcClient,
Provider,
},
},
prometheus_client::{
encoding::EncodeLabelSet,
metrics::{
counter::Counter,
family::Family,
histogram::Histogram,
},
registry::Registry,
},
std::{
str::FromStr,
sync::Arc,
},
tokio::{
sync::RwLock,
time::Instant,
},
};

#[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)]
pub struct RpcLabel {
chain_id: ChainId,
method: String,
}

#[derive(Debug)]
pub struct RpcMetrics {
count: Family<RpcLabel, Counter>,
latency: Family<RpcLabel, Histogram>,
errors_count: Family<RpcLabel, Counter>,
}

impl RpcMetrics {
pub async fn new(metrics_registry: Arc<RwLock<Registry>>) -> Self {
let count = Family::default();
let mut guard = metrics_registry.write().await;
let sub_registry = guard.sub_registry_with_prefix("rpc_requests");
sub_registry.register(
"count",
"The number of RPC requests made to the chain with the specified method.",
count.clone(),
);

let latency = Family::<RpcLabel, Histogram>::new_with_constructor(|| {
Histogram::new(
[
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
]
.into_iter(),
)
});
sub_registry.register(
"latency",
"The latency of RPC requests to the chain with the specified method.",
latency.clone(),
);

let errors_count = Family::default();
sub_registry.register(
"errors_count",
"The number of RPC requests made to the chain that failed.",
errors_count.clone(),
);

Self {
count,
latency,
errors_count,
}
}
}

#[derive(Debug, Clone)]
pub struct TracedClient {
inner: Http,

chain_id: ChainId,
metrics: Arc<RpcMetrics>,
}

#[async_trait]
impl JsonRpcClient for TracedClient {
type Error = HttpClientError;

async fn request<
T: serde::Serialize + Send + Sync + std::fmt::Debug,
R: serde::de::DeserializeOwned + Send,
>(
&self,
method: &str,
params: T,
) -> Result<R, HttpClientError> {
let start = Instant::now();
let label = &RpcLabel {
chain_id: self.chain_id.clone(),
method: method.to_string(),
};
self.metrics.count.get_or_create(label).inc();
let res = match self.inner.request(method, params).await {
Ok(result) => Ok(result),
Err(e) => {
self.metrics.errors_count.get_or_create(label).inc();
Err(e)
}
};

let latency = start.elapsed().as_secs_f64();
self.metrics.latency.get_or_create(label).observe(latency);
res
}
}

impl TracedClient {
pub fn new(
chain_id: ChainId,
url: &str,
metrics: Arc<RpcMetrics>,
) -> Result<Provider<TracedClient>> {
Ok(Provider::new(TracedClient {
inner: Http::from_str(url)?,
chain_id,
metrics,
}))
}
}
Loading

0 comments on commit c264c31

Please sign in to comment.