From 8d0c5b712debeca8aa8058063b4f4eb16e193d5d Mon Sep 17 00:00:00 2001 From: Martin Linzmayer Date: Thu, 31 Oct 2024 15:49:00 +0100 Subject: [PATCH] feat(server): measure body reading duration and send metrics (#4190) Wraps a body with a measurable body so that we can track how long reading a body takes --- Cargo.lock | 9 +- Cargo.toml | 1 + relay-server/Cargo.toml | 1 + relay-server/src/middlewares/body_timing.rs | 282 ++++++++++++++++++++ relay-server/src/middlewares/mod.rs | 3 + relay-server/src/services/server.rs | 5 +- relay-server/src/statsd.rs | 3 + 7 files changed, 298 insertions(+), 6 deletions(-) create mode 100644 relay-server/src/middlewares/body_timing.rs diff --git a/Cargo.lock b/Cargo.lock index f844e78d09..32a57117ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,7 +412,7 @@ dependencies = [ "bitflags 2.4.1", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -1766,9 +1766,9 @@ checksum = "62eef4964b4e1c2d66981a5646d893768fd15d96957aae5e0e85c632503e9724" [[package]] name = "hyper" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" dependencies = [ "bytes", "futures-channel", @@ -2999,7 +2999,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acf0c195eebb4af52c752bec4f52f645da98b6e92077a04110c7f349477ae5ac" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.77", @@ -3750,6 +3750,7 @@ dependencies = [ "futures", "hashbrown", "http", + "hyper", "hyper-util", "insta", "itertools 0.13.0", diff --git a/Cargo.toml b/Cargo.toml index 2865dccce0..d028039a02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,6 +102,7 @@ hostname = "0.4.0" human-size = "0.4.1" http = "1.1.0" hyper-util = { version = "0.1.7", features = ["tokio"] } +hyper = "1.5.0" indexmap = "2.2.5" insta = { version = "1.31.0", features = ["json", "redactions", "ron"] } ipnetwork = "0.20.0" diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index dbec10dc95..d78359eb09 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -126,6 +126,7 @@ url = { workspace = true, features = ["serde"] } uuid = { workspace = true, features = ["v5"] } zstd = { workspace = true } semver = { workspace = true } +hyper = { workspace = true } [dev-dependencies] criterion = { workspace = true } diff --git a/relay-server/src/middlewares/body_timing.rs b/relay-server/src/middlewares/body_timing.rs new file mode 100644 index 0000000000..47714528ed --- /dev/null +++ b/relay-server/src/middlewares/body_timing.rs @@ -0,0 +1,282 @@ +use crate::statsd::RelayTimers; +use axum::body::{Body, HttpBody}; +use axum::http::Request; +use hyper::body::Frame; +use relay_statsd::metric; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; +use tokio::time::Instant; +use tower::{Layer, Service}; + +/// Middleware layer that wraps the request [`Body`] to measure body reading duration. +/// +/// Each metric includes the following tags: +/// - `route`: The request's URI path +/// - `size`: Size category based on bytes read +/// - `status`: Reading outcome, one of: +/// - `"completed"`: Body was fully read +/// - `"failed"`: Reading encountered an error +/// - `"dropped"`: Body was dropped before completion +/// +/// The size reflects actual bytes read, which may differ from the `Content-Length` header. +/// For interrupted reads (failed/dropped), only successfully read bytes are counted. +#[derive(Clone, Debug, Default)] +pub struct BodyTimingLayer; + +impl Layer for BodyTimingLayer { + type Service = BodyTiming; + + fn layer(&self, inner: S) -> Self::Service { + BodyTiming::new(inner) + } +} + +/// Tower service provided by [`BodyTimingLayer`]. +#[derive(Debug, Copy, Clone)] +pub struct BodyTiming { + inner: S, +} + +impl BodyTiming { + pub fn new(inner: S) -> Self { + Self { inner } + } +} + +impl Service> for BodyTiming +where + S: Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let (parts, body) = req.into_parts(); + let timed_body = Body::new(TimedBody::new(body, parts.uri.to_string())); + let request = Request::from_parts(parts, timed_body); + self.inner.call(request) + } +} + +#[derive(Debug)] +struct TimedBody { + inner: Body, + reading_started_at: Option, + size: usize, + route: String, +} + +impl TimedBody { + fn new(inner: Body, route: String) -> Self { + Self { + inner, + reading_started_at: None, + size: 0, + route, + } + } + + fn finish_timing(&mut self, status: &str) { + if let Some(started_at) = self.reading_started_at.take() { + self.emit_metric(started_at.elapsed(), status); + } + } + + fn emit_metric(&self, duration: Duration, status: &str) { + metric!( + timer(RelayTimers::BodyReading) = duration, + route = &self.route, + size = size_category(self.size), + status = status + ) + } +} + +impl hyper::body::Body for TimedBody { + type Data = ::Data; + type Error = ::Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + if self.reading_started_at.is_none() { + self.reading_started_at = Some(Instant::now()); + } + let pinned = Pin::new(&mut self.inner); + let poll_result = pinned.poll_frame(cx); + + if let Poll::Ready(Some(Ok(frame))) = &poll_result { + if let Some(data) = frame.data_ref() { + self.size += data.len(); + } + } + + match &poll_result { + Poll::Ready(None) => self.finish_timing("completed"), + Poll::Ready(Some(Err(_))) => self.finish_timing("failed"), + _ => {} + } + + poll_result + } +} + +impl Drop for TimedBody { + fn drop(&mut self) { + self.finish_timing("dropped"); + } +} + +fn size_category(size: usize) -> &'static str { + match size { + 0..1_000 => "<1KB", + 1_000..10_000 => "<10KB", + 10_000..100_000 => "<100KB", + 100_000..1_000_000 => "<1MB", + 1_000_000..10_000_000 => "<10MB", + 10_000_000..100_000_000 => "<100MB", + 100_000_000.. => ">=100MB", + } +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::body::HttpBody; + use futures::task::noop_waker_ref; + use relay_statsd::with_capturing_test_client; + + struct ErrorBody; + + impl hyper::body::Body for ErrorBody { + type Data = bytes::Bytes; + type Error = ::Error; + + fn poll_frame( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Poll::Ready(Some(Err(axum::Error::new("error")))) + } + } + + #[test] + fn test_empty_body() { + let captures = with_capturing_test_client(|| { + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + + let empty_body = Body::from(vec![]); + let mut timed_body = TimedBody::new(empty_body, "example/route".to_string()); + let pinned = Pin::new(&mut timed_body); + + let _ = pinned.poll_frame(&mut cx); + }); + assert_eq!( + captures, + ["body.reading.duration:0|ms|#route:example/route,size:<1KB,status:completed"] + ); + } + + #[test] + fn test_body() { + let captures = with_capturing_test_client(|| { + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + + let body = Body::new("cool test".to_string()); + let mut timed_body = TimedBody::new(body, "example/route".to_string()); + let mut pinned = Pin::new(&mut timed_body); + + let _ = pinned.as_mut().poll_frame(&mut cx); + let _ = pinned.as_mut().poll_frame(&mut cx); + }); + assert_eq!( + captures, + ["body.reading.duration:0|ms|#route:example/route,size:<1KB,status:completed"] + ); + } + + #[test] + fn test_dropped_while_reading() { + let captures = with_capturing_test_client(|| { + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + + let body = Body::new("just calling this once".to_string()); + let mut timed_body = TimedBody::new(body, "example/route".to_string()); + let pinned = Pin::new(&mut timed_body); + + let _ = pinned.poll_frame(&mut cx); + }); + assert_eq!( + captures, + ["body.reading.duration:0|ms|#route:example/route,size:<1KB,status:dropped"] + ) + } + + #[test] + fn test_dropped_before_reading() { + let captures = with_capturing_test_client(|| { + let body = Body::new("dropped".to_string()); + let _ = TimedBody::new(body, "example/route".to_string()); + }); + assert_eq!(captures.len(), 0); + } + + #[test] + fn test_failed_body() { + let captures = with_capturing_test_client(|| { + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + + let body = Body::new(ErrorBody {}); + let mut timed_body = TimedBody::new(body, "example/route".to_string()); + + let pinned = Pin::new(&mut timed_body); + let _ = pinned.poll_frame(&mut cx); + }); + assert_eq!( + captures, + ["body.reading.duration:0|ms|#route:example/route,size:<1KB,status:failed"] + ) + } + + #[test] + fn test_large_body() { + let captures = with_capturing_test_client(|| { + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + + let data = (0..2000).map(|i| i as u8).collect::>(); + + let body = Body::from(data); + let mut timed_body = TimedBody::new(body, "example/route".to_string()); + + let mut pinned = Pin::new(&mut timed_body); + while let Poll::Ready(Some(Ok(_))) = pinned.as_mut().poll_frame(&mut cx) {} + }); + assert_eq!( + captures, + ["body.reading.duration:0|ms|#route:example/route,size:<10KB,status:completed"] + ) + } + + #[test] + fn test_size_category() { + assert_eq!(size_category(10), "<1KB"); + assert_eq!(size_category(9_000), "<10KB"); + assert_eq!(size_category(10_000), "<100KB"); + assert_eq!(size_category(99_999), "<100KB"); + assert_eq!(size_category(1_000_000), "<10MB"); + assert_eq!(size_category(50_000_000), "<100MB"); + assert_eq!(size_category(100_000_000), ">=100MB") + } +} diff --git a/relay-server/src/middlewares/mod.rs b/relay-server/src/middlewares/mod.rs index bb50b83766..a903d689e2 100644 --- a/relay-server/src/middlewares/mod.rs +++ b/relay-server/src/middlewares/mod.rs @@ -15,6 +15,9 @@ mod normalize_path; mod sentry_tower; mod trace; +mod body_timing; + +pub use self::body_timing::*; pub use self::cors::*; pub use self::decompression::*; pub use self::handle_panic::*; diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index d52fa76148..20ed693079 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -20,8 +20,8 @@ use tower_http::set_header::SetResponseHeaderLayer; use crate::constants; use crate::middlewares::{ - self, CatchPanicLayer, NewSentryLayer, NormalizePath, RequestDecompressionLayer, - SentryHttpLayer, + self, BodyTimingLayer, CatchPanicLayer, NewSentryLayer, NormalizePath, + RequestDecompressionLayer, SentryHttpLayer, }; use crate::service::ServiceState; use crate::statsd::RelayCounters; @@ -63,6 +63,7 @@ fn make_app(service: ServiceState) -> App { // - Requests go from top to bottom // - Responses go from bottom to top let middleware = ServiceBuilder::new() + .layer(BodyTimingLayer) .layer(axum::middleware::from_fn(middlewares::metrics)) .layer(CatchPanicLayer::custom(middlewares::handle_panic)) .layer(SetResponseHeaderLayer::overriding( diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index e7f3a498db..8683d3c32c 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -546,6 +546,8 @@ pub enum RelayTimers { BufferDrain, /// Timing in milliseconds for the time it takes for the envelopes to be serialized. BufferEnvelopesSerialization, + /// Timing in milliseconds to the time it takes to read an HTTP body. + BodyReading, } impl TimerMetric for RelayTimers { @@ -595,6 +597,7 @@ impl TimerMetric for RelayTimers { RelayTimers::BufferPop => "buffer.pop.duration", RelayTimers::BufferDrain => "buffer.drain.duration", RelayTimers::BufferEnvelopesSerialization => "buffer.envelopes_serialization", + RelayTimers::BodyReading => "body.reading.duration", } } }