Skip to content

Commit

Permalink
feat(spooler): Add additional metrics to the buffer (#4027)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Sep 12, 2024
1 parent bb90d30 commit 2c2e6c4
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 13 deletions.
30 changes: 18 additions & 12 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,34 @@ impl PolymorphicEnvelopeBuffer {

/// Adds an envelope to the buffer.
pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
match self {
Self::Sqlite(buffer) => buffer.push(envelope).await,
Self::InMemory(buffer) => buffer.push(envelope).await,
}?;
relay_statsd::metric!(timer(RelayTimers::BufferPush), {
match self {
Self::Sqlite(buffer) => buffer.push(envelope).await,
Self::InMemory(buffer) => buffer.push(envelope).await,
}?;
});
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesWritten) += 1);
Ok(())
}

/// Returns a reference to the next-in-line envelope.
pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
match self {
Self::Sqlite(buffer) => buffer.peek().await,
Self::InMemory(buffer) => buffer.peek().await,
}
relay_statsd::metric!(timer(RelayTimers::BufferPeek), {
match self {
Self::Sqlite(buffer) => buffer.peek().await,
Self::InMemory(buffer) => buffer.peek().await,
}
})
}

/// Pops the next-in-line envelope.
pub async fn pop(&mut self) -> Result<Option<Box<Envelope>>, EnvelopeBufferError> {
let envelope = match self {
Self::Sqlite(buffer) => buffer.pop().await,
Self::InMemory(buffer) => buffer.pop().await,
}?;
let envelope = relay_statsd::metric!(timer(RelayTimers::BufferPop), {
match self {
Self::Sqlite(buffer) => buffer.pop().await,
Self::InMemory(buffer) => buffer.pop().await,
}?
});
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesRead) += 1);
Ok(envelope)
}
Expand Down
10 changes: 9 additions & 1 deletion relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::statsd::RelayTimers;
use crate::statsd::{RelayCounters, RelayTimers};

/// An error returned when doing an operation on [`SqliteEnvelopeStack`].
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -91,6 +91,10 @@ impl SqliteEnvelopeStack {
};
self.batches_buffer_size -= envelopes.len();

relay_statsd::metric!(
counter(RelayCounters::BufferSpooledEnvelopes) += envelopes.len() as u64
);

// We convert envelopes into a format which simplifies insertion in the store. If an
// envelope can't be serialized, we will not insert it.
let envelopes = envelopes.iter().filter_map(|e| e.as_ref().try_into().ok());
Expand Down Expand Up @@ -139,6 +143,10 @@ impl SqliteEnvelopeStack {
return Ok(());
}

relay_statsd::metric!(
counter(RelayCounters::BufferUnspooledEnvelopes) += envelopes.len() as u64
);

// We push in the back of the buffer, since we still want to give priority to
// incoming envelopes that have a more recent timestamp.
self.batches_buffer_size += envelopes.len();
Expand Down
22 changes: 22 additions & 0 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,23 @@ impl EnvelopeBufferService {
&mut self,
buffer: &mut PolymorphicEnvelopeBuffer,
) -> Result<(), SendError> {
relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checking"
);

self.system_ready(buffer).await;
tokio::time::sleep(self.sleep).await;
if let Some(project_cache_ready) = self.project_cache_ready.as_mut() {
project_cache_ready.await?;
self.project_cache_ready = None;
}

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checked"
);

Ok(())
}

Expand Down Expand Up @@ -180,10 +190,18 @@ impl EnvelopeBufferService {
match buffer.peek().await? {
Peek::Empty => {
relay_log::trace!("EnvelopeBufferService empty");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "empty"
);
self.sleep = Duration::MAX; // wait for reset by `handle_message`.
}
Peek::Ready(_) => {
relay_log::trace!("EnvelopeBufferService pop");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "ready"
);
let envelope = buffer
.pop()
.await?
Expand All @@ -195,6 +213,10 @@ impl EnvelopeBufferService {
}
Peek::NotReady(stack_key, envelope) => {
relay_log::trace!("EnvelopeBufferService request update");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "not_ready"
);
let project_key = envelope.meta().public_key();
self.project_cache.send(UpdateProject(project_key));
match envelope.sampling_key() {
Expand Down
21 changes: 21 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,12 @@ pub enum RelayTimers {
BufferSpool,
/// Timing in milliseconds for the time it takes for the buffer to unspool data from disk.
BufferUnspool,
/// Timing in milliseconds for the time it takes for the buffer to push.
BufferPush,
/// Timing in milliseconds for the time it takes for the buffer to peek.
BufferPeek,
/// Timing in milliseconds for the time it takes for the buffer to pop.
BufferPop,
}

impl TimerMetric for RelayTimers {
Expand Down Expand Up @@ -568,6 +574,9 @@ impl TimerMetric for RelayTimers {
RelayTimers::BufferInitialization => "buffer.initialization.duration",
RelayTimers::BufferSpool => "buffer.spool.duration",
RelayTimers::BufferUnspool => "buffer.unspool.duration",
RelayTimers::BufferPush => "buffer.push.duration",
RelayTimers::BufferPeek => "buffer.peek.duration",
RelayTimers::BufferPop => "buffer.pop.duration",
}
}
}
Expand Down Expand Up @@ -621,6 +630,14 @@ pub enum RelayCounters {
/// Number of times an envelope stack is popped from the priority queue of stacks in the
/// envelope buffer.
BufferEnvelopeStacksPopped,
/// Number of times an envelope from the buffer is trying to be popped.
BufferTryPop,
/// Number of times the readiness check of the buffer is polled.
BufferReadyToPop,
/// Number of envelopes spool to disk.
BufferSpooledEnvelopes,
/// Number of envelopes unspooled from disk.
BufferUnspooledEnvelopes,
///
/// Number of outcomes and reasons for rejected Envelopes.
///
Expand Down Expand Up @@ -839,6 +856,10 @@ impl CounterMetric for RelayCounters {
RelayCounters::BufferEnvelopesReturned => "buffer.envelopes_returned",
RelayCounters::BufferStateTransition => "buffer.state.transition",
RelayCounters::BufferEnvelopeStacksPopped => "buffer.envelope_stacks_popped",
RelayCounters::BufferTryPop => "buffer.try_pop",
RelayCounters::BufferReadyToPop => "buffer.ready_to_pop",
RelayCounters::BufferSpooledEnvelopes => "buffer.spooled_envelopes",
RelayCounters::BufferUnspooledEnvelopes => "buffer.unspooled_envelopes",
RelayCounters::Outcomes => "events.outcomes",
RelayCounters::ProjectStateGet => "project_state.get",
RelayCounters::ProjectStateRequest => "project_state.request",
Expand Down

0 comments on commit 2c2e6c4

Please sign in to comment.