Skip to content

Commit

Permalink
feat: log RPC calls (#4739)
Browse files Browse the repository at this point in the history
Co-authored-by: David Himmelstrup <david.himmelstrup@chainsafe.io>
  • Loading branch information
LesnyRumcajs and lemmih authored Sep 6, 2024
1 parent 4986efe commit 17703a5
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 8 deletions.
1 change: 1 addition & 0 deletions .config/forest.dic
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ MDNS
mempool
Merkle
MiB
middleware
milliGas
multiaddr/SM
multihash
Expand Down
1 change: 0 additions & 1 deletion src/cli_shared/logger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ fn default_env_filter() -> EnvFilter {
"libp2p_bitswap=off",
"libp2p_gossipsub=error",
"libp2p_kad=error",
"rpc=error",
"storage_proofs_core=warn",
"tracing_loki=off",
"quinn_udp=error",
Expand Down
118 changes: 118 additions & 0 deletions src/rpc/log_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2019-2024 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

//! Middleware layer for logging RPC calls.

use std::{
borrow::Cow,
hash::{DefaultHasher, Hash as _, Hasher},
};

use futures::future::BoxFuture;
use futures::FutureExt;
use jsonrpsee::MethodResponse;
use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Id};
use tower::Layer;

// State-less jsonrpcsee layer for logging information about RPC calls
#[derive(Clone, Default)]
pub(super) struct LogLayer {}

impl<S> Layer<S> for LogLayer {
type Service = Logging<S>;

fn layer(&self, service: S) -> Self::Service {
Logging { service }
}
}

#[derive(Clone)]
pub(super) struct Logging<S> {
service: S,
}

impl<'a, S> RpcServiceT<'a> for Logging<S>
where
S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
{
type Future = BoxFuture<'a, MethodResponse>;

fn call(&self, req: jsonrpsee::types::Request<'a>) -> Self::Future {
let service = self.service.clone();

async move {
// Avoid performance overhead if INFO level is not enabled.
if !tracing::enabled!(tracing::Level::INFO) {
return service.call(req).await;
}

let start_time = std::time::Instant::now();
let method_name = req.method_name().to_owned();
let id = req.id();
let id = create_unique_id(id, start_time);

tracing::debug!(
"RPC#{id}: {method_name}. Params: {params}",
params = req.params().as_str().unwrap_or("[]")
);

let resp = service.call(req).await;

let elapsed = start_time.elapsed();
let result = resp.as_error_code().map_or(Cow::Borrowed("OK"), |code| {
Cow::Owned(format!("ERR({code})"))
});
tracing::info!("RPC#{id} {result}: {method_name}. Took {elapsed:?}");

resp
}
.boxed()
}
}

/// Creates a unique ID for the RPC call, so it can be easily tracked in logs.
fn create_unique_id(id: Id, start_time: std::time::Instant) -> String {
const ID_LEN: usize = 6;
let mut hasher = DefaultHasher::new();
id.hash(&mut hasher);
start_time.hash(&mut hasher);
let mut id = format!("{:x}", hasher.finish());
id.truncate(ID_LEN);
id
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;

#[test]
fn test_create_unique_id_same() {
let id = Id::Number(1);
let start_time = std::time::Instant::now();
let id1 = create_unique_id(id.clone(), start_time);
let id2 = create_unique_id(id, start_time);
assert_eq!(id1, id2);
}

#[test]
fn test_create_unique_id_different_ids() {
let id1 = Id::Number(1);
let id2 = Id::Number(2);
let start_time = std::time::Instant::now();
let id1 = create_unique_id(id1, start_time);
let id2 = create_unique_id(id2, start_time);
assert_ne!(id1, id2);
}

#[test]
fn test_create_unique_id_different_times() {
let id = Id::Number(1);
let start_time1 = std::time::Instant::now();
let start_time2 = std::time::Instant::now() + Duration::from_nanos(1);
let id1 = create_unique_id(id.clone(), start_time1);
let id2 = create_unique_id(id, start_time2);
assert_ne!(id1, id2);
}
}
12 changes: 6 additions & 6 deletions src/rpc/metrics_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use jsonrpsee::MethodResponse;
use tower::Layer;

// State-less jsonrpcsee layer for measuring RPC metrics
#[derive(Clone)]
pub struct MetricsLayer {}
#[derive(Clone, Default)]
pub(super) struct MetricsLayer {}

impl<S> Layer<S> for MetricsLayer {
type Service = RecordMetrics<S>;
Expand All @@ -21,7 +21,7 @@ impl<S> Layer<S> for MetricsLayer {
}

#[derive(Clone)]
pub struct RecordMetrics<S> {
pub(super) struct RecordMetrics<S> {
service: S,
}

Expand All @@ -40,18 +40,18 @@ where
async move {
// Cannot use HistogramTimerExt::start_timer here since it would lock the metric.
let start_time = std::time::Instant::now();
let req = service.call(req).await;
let resp = service.call(req).await;

metrics::RPC_METHOD_TIME
.get_or_create(&method)
// Observe the elapsed time in milliseconds
.observe(start_time.elapsed().as_secs_f64() * 1000.0);

if req.is_error() {
if resp.is_error() {
metrics::RPC_METHOD_FAILURE.get_or_create(&method).inc();
}

req
resp
}
.boxed()
}
Expand Down
5 changes: 4 additions & 1 deletion src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
mod auth_layer;
mod channel;
mod client;
mod log_layer;
mod metrics_layer;
mod request;

pub use client::Client;
pub use error::ServerError;
use eth::filter::EthEventHandler;
use futures::FutureExt as _;
use log_layer::LogLayer;
use reflect::Ctx;
pub use reflect::{ApiPath, ApiPaths, RpcMethod, RpcMethodExt};
pub use request::Request;
Expand Down Expand Up @@ -481,7 +483,8 @@ where
headers,
keystore: keystore.clone(),
})
.layer(MetricsLayer {});
.layer(LogLayer::default())
.layer(MetricsLayer::default());
let mut jsonrpsee_svc = svc_builder
.set_rpc_middleware(rpc_middleware)
.build(methods, stop_handle);
Expand Down

0 comments on commit 17703a5

Please sign in to comment.