Skip to content

Commit

Permalink
Add metric
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Aug 26, 2024
1 parent bc3b2fd commit 68a10b8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
46 changes: 25 additions & 21 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::services::buffer::envelope_stack::{EnvelopeStack, Evictable, StackPro
use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError;
use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
use crate::statsd::{RelayCounters, RelayGauges};
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};

/// Polymorphic envelope buffering interface.
///
Expand Down Expand Up @@ -310,32 +310,36 @@ where
}

let mut lru: BinaryHeap<LRUItem> = BinaryHeap::new();
for (queue_item, priority) in self.priority_queue.iter() {
let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update);

// If we exceed the size, we want to pop the greatest element only if we have a smaller
// element, so that we end up with the smallest elements which are the ones with the
// lowest priority.
if lru.len() >= self.max_evictable_stacks {
let Some(top_lru_item) = lru.peek() else {
continue;
};

if lru_item < *top_lru_item {
lru.pop();
relay_statsd::metric!(timer(RelayTimers::BufferEvictLRUConstruction), {
for (queue_item, priority) in self.priority_queue.iter() {
let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update);

// If we exceed the size, we want to pop the greatest element only if we have a smaller
// element, so that we end up with the smallest elements which are the ones with the
// lowest priority.
if lru.len() >= self.max_evictable_stacks {
let Some(top_lru_item) = lru.peek() else {
continue;
};

if lru_item < *top_lru_item {
lru.pop();
}
}
}

lru.push(lru_item);
}
lru.push(lru_item);
}
});

// We go over each element and remove it from the stack. After removal, we will evict
// elements from each popped stack.
for lru_item in lru {
if let Some(mut stack) = self.pop_stack(lru_item.0) {
stack.evict().await;
relay_statsd::metric!(timer(RelayTimers::BufferEvictStacksEviction), {
for lru_item in lru {
if let Some(mut stack) = self.pop_stack(lru_item.0) {
stack.evict().await;
}
}
}
});
}

fn push_stack(&mut self, envelope: Box<Envelope>) {
Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,10 @@ pub enum RelayTimers {
/// - `message`: The type of message that was processed.
#[cfg(feature = "processing")]
StoreServiceDuration,
/// Timing in milliseconds for constructing the LRU list of stacks.
BufferEvictLRUConstruction,
/// Timing in milliseconds to evict all the stacks determined by the LRU algorithm.
BufferEvictStacksEviction,
}

impl TimerMetric for RelayTimers {
Expand Down Expand Up @@ -555,6 +559,8 @@ impl TimerMetric for RelayTimers {
RelayTimers::MetricRouterServiceDuration => "metrics.router.message.duration",
#[cfg(feature = "processing")]
RelayTimers::StoreServiceDuration => "store.message.duration",
RelayTimers::BufferEvictLRUConstruction => "buffer.evict.lru_construction",
RelayTimers::BufferEvictStacksEviction => "buffer.evict.stacks_eviction",
}
}
}
Expand Down

0 comments on commit 68a10b8

Please sign in to comment.