Skip to content

Commit

Permalink
feat(server): measure body reading duration and send metrics (#4190)
Browse files Browse the repository at this point in the history
Wraps a body with a measurable body so that we can track how long
reading a body takes
  • Loading branch information
Litarnus authored Oct 31, 2024
1 parent c162ed1 commit 8d0c5b7
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 6 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ hostname = "0.4.0"
human-size = "0.4.1"
http = "1.1.0"
hyper-util = { version = "0.1.7", features = ["tokio"] }
hyper = "1.5.0"
indexmap = "2.2.5"
insta = { version = "1.31.0", features = ["json", "redactions", "ron"] }
ipnetwork = "0.20.0"
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ url = { workspace = true, features = ["serde"] }
uuid = { workspace = true, features = ["v5"] }
zstd = { workspace = true }
semver = { workspace = true }
hyper = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
Expand Down
282 changes: 282 additions & 0 deletions relay-server/src/middlewares/body_timing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
use crate::statsd::RelayTimers;
use axum::body::{Body, HttpBody};
use axum::http::Request;
use hyper::body::Frame;
use relay_statsd::metric;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::Instant;
use tower::{Layer, Service};

/// Middleware layer that wraps the request [`Body`] to measure body reading duration.
///
/// Each metric includes the following tags:
/// - `route`: The request's URI path
/// - `size`: Size category based on bytes read
/// - `status`: Reading outcome, one of:
/// - `"completed"`: Body was fully read
/// - `"failed"`: Reading encountered an error
/// - `"dropped"`: Body was dropped before completion
///
/// The size reflects actual bytes read, which may differ from the `Content-Length` header.
/// For interrupted reads (failed/dropped), only successfully read bytes are counted.
#[derive(Clone, Debug, Default)]
pub struct BodyTimingLayer;

impl<S> Layer<S> for BodyTimingLayer {
type Service = BodyTiming<S>;

fn layer(&self, inner: S) -> Self::Service {
BodyTiming::new(inner)
}
}

/// Tower service provided by [`BodyTimingLayer`].
#[derive(Debug, Copy, Clone)]
pub struct BodyTiming<S> {
inner: S,
}

impl<S> BodyTiming<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
}

impl<S> Service<Request<Body>> for BodyTiming<S>
where
S: Service<Request<Body>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
let (parts, body) = req.into_parts();
let timed_body = Body::new(TimedBody::new(body, parts.uri.to_string()));
let request = Request::from_parts(parts, timed_body);
self.inner.call(request)
}
}

#[derive(Debug)]
struct TimedBody {
inner: Body,
reading_started_at: Option<Instant>,
size: usize,
route: String,
}

impl TimedBody {
fn new(inner: Body, route: String) -> Self {
Self {
inner,
reading_started_at: None,
size: 0,
route,
}
}

fn finish_timing(&mut self, status: &str) {
if let Some(started_at) = self.reading_started_at.take() {
self.emit_metric(started_at.elapsed(), status);
}
}

fn emit_metric(&self, duration: Duration, status: &str) {
metric!(
timer(RelayTimers::BodyReading) = duration,
route = &self.route,
size = size_category(self.size),
status = status
)
}
}

impl hyper::body::Body for TimedBody {
type Data = <Body as HttpBody>::Data;
type Error = <Body as HttpBody>::Error;

fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if self.reading_started_at.is_none() {
self.reading_started_at = Some(Instant::now());
}
let pinned = Pin::new(&mut self.inner);
let poll_result = pinned.poll_frame(cx);

if let Poll::Ready(Some(Ok(frame))) = &poll_result {
if let Some(data) = frame.data_ref() {
self.size += data.len();
}
}

match &poll_result {
Poll::Ready(None) => self.finish_timing("completed"),
Poll::Ready(Some(Err(_))) => self.finish_timing("failed"),
_ => {}
}

poll_result
}
}

impl Drop for TimedBody {
fn drop(&mut self) {
self.finish_timing("dropped");
}
}

fn size_category(size: usize) -> &'static str {
match size {
0..1_000 => "<1KB",
1_000..10_000 => "<10KB",
10_000..100_000 => "<100KB",
100_000..1_000_000 => "<1MB",
1_000_000..10_000_000 => "<10MB",
10_000_000..100_000_000 => "<100MB",
100_000_000.. => ">=100MB",
}
}

#[cfg(test)]
mod tests {
use super::*;
use axum::body::HttpBody;
use futures::task::noop_waker_ref;
use relay_statsd::with_capturing_test_client;

struct ErrorBody;

impl hyper::body::Body for ErrorBody {
type Data = bytes::Bytes;
type Error = <Body as HttpBody>::Error;

fn poll_frame(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Poll::Ready(Some(Err(axum::Error::new("error"))))
}
}

#[test]
fn test_empty_body() {
let captures = with_capturing_test_client(|| {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(waker);

let empty_body = Body::from(vec![]);
let mut timed_body = TimedBody::new(empty_body, "example/route".to_string());
let pinned = Pin::new(&mut timed_body);

let _ = pinned.poll_frame(&mut cx);
});
assert_eq!(
captures,
["body.reading.duration:0|ms|#route:example/route,size:<1KB,status:completed"]
);
}

#[test]
fn test_body() {
let captures = with_capturing_test_client(|| {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(waker);

let body = Body::new("cool test".to_string());
let mut timed_body = TimedBody::new(body, "example/route".to_string());
let mut pinned = Pin::new(&mut timed_body);

let _ = pinned.as_mut().poll_frame(&mut cx);
let _ = pinned.as_mut().poll_frame(&mut cx);
});
assert_eq!(
captures,
["body.reading.duration:0|ms|#route:example/route,size:<1KB,status:completed"]
);
}

#[test]
fn test_dropped_while_reading() {
let captures = with_capturing_test_client(|| {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(waker);

let body = Body::new("just calling this once".to_string());
let mut timed_body = TimedBody::new(body, "example/route".to_string());
let pinned = Pin::new(&mut timed_body);

let _ = pinned.poll_frame(&mut cx);
});
assert_eq!(
captures,
["body.reading.duration:0|ms|#route:example/route,size:<1KB,status:dropped"]
)
}

#[test]
fn test_dropped_before_reading() {
let captures = with_capturing_test_client(|| {
let body = Body::new("dropped".to_string());
let _ = TimedBody::new(body, "example/route".to_string());
});
assert_eq!(captures.len(), 0);
}

#[test]
fn test_failed_body() {
let captures = with_capturing_test_client(|| {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(waker);

let body = Body::new(ErrorBody {});
let mut timed_body = TimedBody::new(body, "example/route".to_string());

let pinned = Pin::new(&mut timed_body);
let _ = pinned.poll_frame(&mut cx);
});
assert_eq!(
captures,
["body.reading.duration:0|ms|#route:example/route,size:<1KB,status:failed"]
)
}

#[test]
fn test_large_body() {
let captures = with_capturing_test_client(|| {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(waker);

let data = (0..2000).map(|i| i as u8).collect::<Vec<u8>>();

let body = Body::from(data);
let mut timed_body = TimedBody::new(body, "example/route".to_string());

let mut pinned = Pin::new(&mut timed_body);
while let Poll::Ready(Some(Ok(_))) = pinned.as_mut().poll_frame(&mut cx) {}
});
assert_eq!(
captures,
["body.reading.duration:0|ms|#route:example/route,size:<10KB,status:completed"]
)
}

#[test]
fn test_size_category() {
assert_eq!(size_category(10), "<1KB");
assert_eq!(size_category(9_000), "<10KB");
assert_eq!(size_category(10_000), "<100KB");
assert_eq!(size_category(99_999), "<100KB");
assert_eq!(size_category(1_000_000), "<10MB");
assert_eq!(size_category(50_000_000), "<100MB");
assert_eq!(size_category(100_000_000), ">=100MB")
}
}
3 changes: 3 additions & 0 deletions relay-server/src/middlewares/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ mod normalize_path;
mod sentry_tower;
mod trace;

mod body_timing;

pub use self::body_timing::*;
pub use self::cors::*;
pub use self::decompression::*;
pub use self::handle_panic::*;
Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/services/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use tower_http::set_header::SetResponseHeaderLayer;

use crate::constants;
use crate::middlewares::{
self, CatchPanicLayer, NewSentryLayer, NormalizePath, RequestDecompressionLayer,
SentryHttpLayer,
self, BodyTimingLayer, CatchPanicLayer, NewSentryLayer, NormalizePath,
RequestDecompressionLayer, SentryHttpLayer,
};
use crate::service::ServiceState;
use crate::statsd::RelayCounters;
Expand Down Expand Up @@ -63,6 +63,7 @@ fn make_app(service: ServiceState) -> App {
// - Requests go from top to bottom
// - Responses go from bottom to top
let middleware = ServiceBuilder::new()
.layer(BodyTimingLayer)
.layer(axum::middleware::from_fn(middlewares::metrics))
.layer(CatchPanicLayer::custom(middlewares::handle_panic))
.layer(SetResponseHeaderLayer::overriding(
Expand Down
3 changes: 3 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ pub enum RelayTimers {
BufferDrain,
/// Timing in milliseconds for the time it takes for the envelopes to be serialized.
BufferEnvelopesSerialization,
/// Timing in milliseconds to the time it takes to read an HTTP body.
BodyReading,
}

impl TimerMetric for RelayTimers {
Expand Down Expand Up @@ -595,6 +597,7 @@ impl TimerMetric for RelayTimers {
RelayTimers::BufferPop => "buffer.pop.duration",
RelayTimers::BufferDrain => "buffer.drain.duration",
RelayTimers::BufferEnvelopesSerialization => "buffer.envelopes_serialization",
RelayTimers::BodyReading => "body.reading.duration",
}
}
}
Expand Down

0 comments on commit 8d0c5b7

Please sign in to comment.