From 76db84dc537a970f9eaffca5f3eb29c49553b44a Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Wed, 31 Jul 2024 15:48:28 +0200 Subject: [PATCH] instr(buffer): Metric for number of stacks (#3878) Add a gauge metric for the total number of stacks in the priority queue. --- .../src/services/buffer/envelope_buffer/mod.rs | 18 +++++++++++++++++- relay-server/src/statsd.rs | 5 +++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 158f1b3bda..969d9a3565 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -13,7 +13,7 @@ use crate::services::buffer::envelope_stack::{EnvelopeStack, StackProvider}; use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError; use crate::services::buffer::stack_provider::memory::MemoryStackProvider; use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; -use crate::statsd::RelayCounters; +use crate::statsd::{RelayCounters, RelayGauges}; /// Polymorphic envelope buffering interface. /// @@ -135,6 +135,10 @@ impl EnvelopeBuffer

where EnvelopeBufferError: std::convert::From<::Error>, { + /// Pushes an envelope to the appropriate envelope stack and reprioritizes the stack. + /// + /// If the envelope stack does not exist, a new stack is pushed to the priority queue. + /// The priority of the stack is updated with the envelope's received_at time. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { let received_at = envelope.meta().start_time(); let stack_key = StackKey::from_envelope(&envelope); @@ -157,6 +161,7 @@ where Ok(()) } + /// Returns a reference to the next-in-line envelope, if one exists. pub async fn peek(&mut self) -> Result, EnvelopeBufferError> { let Some(( QueueItem { @@ -172,6 +177,10 @@ where Ok(stack.peek().await?) } + /// Returns the next-in-line envelope, if one exists. + /// + /// The priority of the envelope's stack is updated with the next envelope's received_at + /// time. If the stack is empty after popping, it is removed from the priority queue. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else { return Ok(None); @@ -196,6 +205,7 @@ where Ok(Some(envelope)) } + /// Reprioritizes all stacks that involve the given project key by setting it to "ready". pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; if let Some(stack_keys) = self.stacks_by_project.get(project) { @@ -238,6 +248,9 @@ where .or_default() .insert(stack_key); } + relay_statsd::metric!( + gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 + ); } fn pop_stack(&mut self, stack_key: StackKey) { @@ -248,6 +261,9 @@ where .remove(&stack_key); } self.priority_queue.remove(&stack_key); + relay_statsd::metric!( + gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 + ); } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 93b97c4be0..489d01d84d 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -30,6 +30,10 @@ pub enum RelayGauges { /// /// This corresponds to the number of corresponding tokio tasks currently scheduled or running. BufferPushInFlight, + /// The number of individual stacks in the priority queue. + /// + /// Per combination of `(own_key, sampling_key)`, a new stack is created. + BufferStackCount, /// The currently used memory by the entire system. /// /// Relay uses the same value for its memory health check. @@ -55,6 +59,7 @@ impl GaugeMetric for RelayGauges { RelayGauges::BufferEnvelopesDiskCount => "buffer.envelopes_disk_count", RelayGauges::BufferPeriodicUnspool => "buffer.unspool.periodic", RelayGauges::BufferPushInFlight => "buffer.push_inflight", + RelayGauges::BufferStackCount => "buffer.stack_count", RelayGauges::SystemMemoryUsed => "health.system_memory.used", RelayGauges::SystemMemoryTotal => "health.system_memory.total", #[cfg(feature = "processing")]