diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 6205fcb42e..e4fdd7d205 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -75,28 +75,34 @@ impl PolymorphicEnvelopeBuffer { /// Adds an envelope to the buffer. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { - match self { - Self::Sqlite(buffer) => buffer.push(envelope).await, - Self::InMemory(buffer) => buffer.push(envelope).await, - }?; + relay_statsd::metric!(timer(RelayTimers::BufferPush), { + match self { + Self::Sqlite(buffer) => buffer.push(envelope).await, + Self::InMemory(buffer) => buffer.push(envelope).await, + }?; + }); relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesWritten) += 1); Ok(()) } /// Returns a reference to the next-in-line envelope. pub async fn peek(&mut self) -> Result { - match self { - Self::Sqlite(buffer) => buffer.peek().await, - Self::InMemory(buffer) => buffer.peek().await, - } + relay_statsd::metric!(timer(RelayTimers::BufferPeek), { + match self { + Self::Sqlite(buffer) => buffer.peek().await, + Self::InMemory(buffer) => buffer.peek().await, + } + }) } /// Pops the next-in-line envelope. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { - let envelope = match self { - Self::Sqlite(buffer) => buffer.pop().await, - Self::InMemory(buffer) => buffer.pop().await, - }?; + let envelope = relay_statsd::metric!(timer(RelayTimers::BufferPop), { + match self { + Self::Sqlite(buffer) => buffer.pop().await, + Self::InMemory(buffer) => buffer.pop().await, + }? + }); relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesRead) += 1); Ok(envelope) } diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 13e1fb5a13..55336e9cc1 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -9,7 +9,7 @@ use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; -use crate::statsd::RelayTimers; +use crate::statsd::{RelayCounters, RelayTimers}; /// An error returned when doing an operation on [`SqliteEnvelopeStack`]. #[derive(Debug, thiserror::Error)] @@ -91,6 +91,10 @@ impl SqliteEnvelopeStack { }; self.batches_buffer_size -= envelopes.len(); + relay_statsd::metric!( + counter(RelayCounters::BufferSpooledEnvelopes) += envelopes.len() as u64 + ); + // We convert envelopes into a format which simplifies insertion in the store. If an // envelope can't be serialized, we will not insert it. let envelopes = envelopes.iter().filter_map(|e| e.as_ref().try_into().ok()); @@ -139,6 +143,10 @@ impl SqliteEnvelopeStack { return Ok(()); } + relay_statsd::metric!( + counter(RelayCounters::BufferUnspooledEnvelopes) += envelopes.len() as u64 + ); + // We push in the back of the buffer, since we still want to give priority to // incoming envelopes that have a more recent timestamp. self.batches_buffer_size += envelopes.len(); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 1b625e1b21..a533f400f3 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -141,6 +141,11 @@ impl EnvelopeBufferService { &mut self, buffer: &mut PolymorphicEnvelopeBuffer, ) -> Result<(), SendError> { + relay_statsd::metric!( + counter(RelayCounters::BufferReadyToPop) += 1, + status = "checking" + ); + self.system_ready(buffer).await; tokio::time::sleep(self.sleep).await; if let Some(project_cache_ready) = self.project_cache_ready.as_mut() { @@ -148,6 +153,11 @@ impl EnvelopeBufferService { self.project_cache_ready = None; } + relay_statsd::metric!( + counter(RelayCounters::BufferReadyToPop) += 1, + status = "checked" + ); + Ok(()) } @@ -180,10 +190,18 @@ impl EnvelopeBufferService { match buffer.peek().await? { Peek::Empty => { relay_log::trace!("EnvelopeBufferService empty"); + relay_statsd::metric!( + counter(RelayCounters::BufferTryPop) += 1, + peek_result = "empty" + ); self.sleep = Duration::MAX; // wait for reset by `handle_message`. } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService pop"); + relay_statsd::metric!( + counter(RelayCounters::BufferTryPop) += 1, + peek_result = "ready" + ); let envelope = buffer .pop() .await? @@ -195,6 +213,10 @@ impl EnvelopeBufferService { } Peek::NotReady(stack_key, envelope) => { relay_log::trace!("EnvelopeBufferService request update"); + relay_statsd::metric!( + counter(RelayCounters::BufferTryPop) += 1, + peek_result = "not_ready" + ); let project_key = envelope.meta().public_key(); self.project_cache.send(UpdateProject(project_key)); match envelope.sampling_key() { diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 3788854b93..4fddb01fb2 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -524,6 +524,12 @@ pub enum RelayTimers { BufferSpool, /// Timing in milliseconds for the time it takes for the buffer to unspool data from disk. BufferUnspool, + /// Timing in milliseconds for the time it takes for the buffer to push. + BufferPush, + /// Timing in milliseconds for the time it takes for the buffer to peek. + BufferPeek, + /// Timing in milliseconds for the time it takes for the buffer to pop. + BufferPop, } impl TimerMetric for RelayTimers { @@ -568,6 +574,9 @@ impl TimerMetric for RelayTimers { RelayTimers::BufferInitialization => "buffer.initialization.duration", RelayTimers::BufferSpool => "buffer.spool.duration", RelayTimers::BufferUnspool => "buffer.unspool.duration", + RelayTimers::BufferPush => "buffer.push.duration", + RelayTimers::BufferPeek => "buffer.peek.duration", + RelayTimers::BufferPop => "buffer.pop.duration", } } } @@ -621,6 +630,14 @@ pub enum RelayCounters { /// Number of times an envelope stack is popped from the priority queue of stacks in the /// envelope buffer. BufferEnvelopeStacksPopped, + /// Number of times an envelope from the buffer is trying to be popped. + BufferTryPop, + /// Number of times the readiness check of the buffer is polled. + BufferReadyToPop, + /// Number of envelopes spool to disk. + BufferSpooledEnvelopes, + /// Number of envelopes unspooled from disk. + BufferUnspooledEnvelopes, /// /// Number of outcomes and reasons for rejected Envelopes. /// @@ -839,6 +856,10 @@ impl CounterMetric for RelayCounters { RelayCounters::BufferEnvelopesReturned => "buffer.envelopes_returned", RelayCounters::BufferStateTransition => "buffer.state.transition", RelayCounters::BufferEnvelopeStacksPopped => "buffer.envelope_stacks_popped", + RelayCounters::BufferTryPop => "buffer.try_pop", + RelayCounters::BufferReadyToPop => "buffer.ready_to_pop", + RelayCounters::BufferSpooledEnvelopes => "buffer.spooled_envelopes", + RelayCounters::BufferUnspooledEnvelopes => "buffer.unspooled_envelopes", RelayCounters::Outcomes => "events.outcomes", RelayCounters::ProjectStateGet => "project_state.get", RelayCounters::ProjectStateRequest => "project_state.request",