Skip to content

Commit

Permalink
instr(buffer): Metric for number of stacks (#3878)
Browse files Browse the repository at this point in the history
Add a gauge metric for the total number of stacks in the priority queue.
  • Loading branch information
jjbayer authored Jul 31, 2024
1 parent d59d2b1 commit 76db84d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
18 changes: 17 additions & 1 deletion relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::services::buffer::envelope_stack::{EnvelopeStack, StackProvider};
use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError;
use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
use crate::statsd::RelayCounters;
use crate::statsd::{RelayCounters, RelayGauges};

/// Polymorphic envelope buffering interface.
///
Expand Down Expand Up @@ -135,6 +135,10 @@ impl<P: StackProvider> EnvelopeBuffer<P>
where
EnvelopeBufferError: std::convert::From<<P::Stack as EnvelopeStack>::Error>,
{
/// Pushes an envelope to the appropriate envelope stack and reprioritizes the stack.
///
/// If the envelope stack does not exist, a new stack is pushed to the priority queue.
/// The priority of the stack is updated with the envelope's received_at time.
pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
let received_at = envelope.meta().start_time();
let stack_key = StackKey::from_envelope(&envelope);
Expand All @@ -157,6 +161,7 @@ where
Ok(())
}

/// Returns a reference to the next-in-line envelope, if one exists.
pub async fn peek(&mut self) -> Result<Option<&Envelope>, EnvelopeBufferError> {
let Some((
QueueItem {
Expand All @@ -172,6 +177,10 @@ where
Ok(stack.peek().await?)
}

/// Returns the next-in-line envelope, if one exists.
///
/// The priority of the envelope's stack is updated with the next envelope's received_at
/// time. If the stack is empty after popping, it is removed from the priority queue.
pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else {
return Ok(None);
Expand All @@ -196,6 +205,7 @@ where
Ok(Some(envelope))
}

/// Reprioritizes all stacks that involve the given project key by setting it to "ready".
pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
let mut changed = false;
if let Some(stack_keys) = self.stacks_by_project.get(project) {
Expand Down Expand Up @@ -238,6 +248,9 @@ where
.or_default()
.insert(stack_key);
}
relay_statsd::metric!(
gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64
);
}

fn pop_stack(&mut self, stack_key: StackKey) {
Expand All @@ -248,6 +261,9 @@ where
.remove(&stack_key);
}
self.priority_queue.remove(&stack_key);
relay_statsd::metric!(
gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64
);
}
}

Expand Down
5 changes: 5 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub enum RelayGauges {
///
/// This corresponds to the number of corresponding tokio tasks currently scheduled or running.
BufferPushInFlight,
/// The number of individual stacks in the priority queue.
///
/// Per combination of `(own_key, sampling_key)`, a new stack is created.
BufferStackCount,
/// The currently used memory by the entire system.
///
/// Relay uses the same value for its memory health check.
Expand All @@ -55,6 +59,7 @@ impl GaugeMetric for RelayGauges {
RelayGauges::BufferEnvelopesDiskCount => "buffer.envelopes_disk_count",
RelayGauges::BufferPeriodicUnspool => "buffer.unspool.periodic",
RelayGauges::BufferPushInFlight => "buffer.push_inflight",
RelayGauges::BufferStackCount => "buffer.stack_count",
RelayGauges::SystemMemoryUsed => "health.system_memory.used",
RelayGauges::SystemMemoryTotal => "health.system_memory.total",
#[cfg(feature = "processing")]
Expand Down

0 comments on commit 76db84d

Please sign in to comment.