From 62f188827ae4b559d1bf6056202699624dcca055 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 14 Aug 2024 17:13:38 +0200 Subject: [PATCH 1/9] feat(spooler): Implement eviction policy --- .../services/buffer/envelope_buffer/mod.rs | 153 +++++++++++++++--- .../services/buffer/envelope_stack/sqlite.rs | 32 +++- .../services/buffer/sqlite_envelope_store.rs | 46 +++++- 3 files changed, 204 insertions(+), 27 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 99e50a09ba..a73e04fd12 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -1,5 +1,5 @@ use std::cmp::Ordering; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, BinaryHeap}; use std::convert::Infallible; use std::time::Instant; @@ -38,7 +38,8 @@ impl PolymorphicEnvelopeBuffer { if config.spool_envelopes_path().is_some() { panic!("Disk backend not yet supported for spool V2"); } - Self::InMemory(EnvelopeBuffer::::new()) + // TODO: use configuration. + Self::InMemory(EnvelopeBuffer::::new(100)) } /// Adds an envelope to the buffer. @@ -105,15 +106,18 @@ struct EnvelopeBuffer { /// This indirection is needed because different stack implementations might need different /// initialization (e.g. a database connection). stack_provider: P, + /// The maximum number of stacks that can be evicted when low on memory. + max_evictable_stacks: usize, } impl EnvelopeBuffer { /// Creates an empty buffer. - pub fn new() -> Self { + pub fn new(max_evictable_stacks: usize) -> Self { Self { stacks_by_project: Default::default(), priority_queue: Default::default(), stack_provider: MemoryStackProvider, + max_evictable_stacks, } } } @@ -126,15 +130,17 @@ impl EnvelopeBuffer { stacks_by_project: Default::default(), priority_queue: Default::default(), stack_provider: SqliteStackProvider::new(config).await?, + // TODO: add configuration. + max_evictable_stacks: 10, }) } } impl EnvelopeBuffer

where - EnvelopeBufferError: std::convert::From<::Error>, + EnvelopeBufferError: From<::Error>, { - /// Pushes an envelope to the appropriate envelope stack and reprioritizes the stack. + /// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack. /// /// If the envelope stack does not exist, a new stack is pushed to the priority queue. /// The priority of the stack is updated with the envelope's received_at time. @@ -145,10 +151,12 @@ where QueueItem { key: _, value: stack, + last_update, }, _, )) = self.priority_queue.get_mut(&stack_key) { + *last_update = Instant::now(); stack.push(envelope).await?; } else { self.push_stack(envelope); @@ -166,13 +174,13 @@ where QueueItem { key: stack_key, value: _, + last_update: _, }, _, )) = self.priority_queue.peek() else { return Ok(None); }; - let stack_key = *stack_key; self.priority_queue.change_priority_by(&stack_key, |prio| { @@ -183,6 +191,7 @@ where QueueItem { key: _, value: stack, + last_update: _, }, _, )) = self.priority_queue.get_mut(&stack_key) @@ -198,16 +207,25 @@ where /// The priority of the envelope's stack is updated with the next envelope's received_at /// time. If the stack is empty after popping, it is removed from the priority queue. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { - let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else { + let Some(( + QueueItem { + key: stack_key, + value: stack, + last_update: _, + }, + _, + )) = self.priority_queue.peek_mut() + else { return Ok(None); }; - let stack_key = *key; + let stack_key = *stack_key; let envelope = stack.pop().await.unwrap().expect("found an empty stack"); let next_received_at = stack .peek() .await? .map(|next_envelope| next_envelope.meta().start_time()); + match next_received_at { None => { self.pop_stack(stack_key); @@ -218,14 +236,26 @@ where }); } } + Ok(Some(envelope)) } - /// Reprioritizes all stacks that involve the given project key by setting it to "ready". + /// Re-prioritizes all stacks that involve the given project key by setting it to "ready". pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; if let Some(stack_keys) = self.stacks_by_project.get(project) { for stack_key in stack_keys { + if let Some(( + QueueItem { + key: _, + value: _, + last_update, + }, + _, + )) = self.priority_queue.get_mut(stack_key) + { + *last_update = Instant::now(); + }; self.priority_queue.change_priority_by(stack_key, |stack| { let mut found = false; for (subkey, readiness) in [ @@ -250,6 +280,56 @@ where changed } + /// Evicts the least recently used stacks. + pub fn evict(&mut self) { + struct LRUItem(StackKey, Readiness, Instant); + + impl PartialEq for LRUItem { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } + } + + impl PartialOrd for LRUItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Eq for LRUItem {} + + impl Ord for LRUItem { + fn cmp(&self, other: &Self) -> Ordering { + match (self.1.ready(), other.1.ready()) { + (true, false) => Ordering::Greater, + (false, true) => Ordering::Less, + _ => self.2.cmp(&other.2), + } + } + } + + let mut lru: BinaryHeap = BinaryHeap::new(); + for (queue_item, priority) in self.priority_queue.iter() { + // If we exceed the size, we want to pop the greatest element, so that we end up with + // the smallest elements which are the ones with the lowest priority. + if lru.len() >= self.max_evictable_stacks { + lru.pop(); + } + + lru.push(LRUItem( + queue_item.key, + priority.readiness, + queue_item.last_update, + )); + } + + // We go over each element and remove it from the stack. The removal will call the `drop` + // method of the `EnvelopeStack` which will have a specific behavior. + for lru_item in lru { + self.pop_stack(lru_item.0); + } + } + fn push_stack(&mut self, envelope: Box) { let received_at = envelope.meta().start_time(); let stack_key = StackKey::from_envelope(&envelope); @@ -257,6 +337,7 @@ where QueueItem { key: stack_key, value: self.stack_provider.create_stack(envelope), + last_update: Instant::now(), }, Priority::new(received_at), ); @@ -274,10 +355,17 @@ where fn pop_stack(&mut self, stack_key: StackKey) { for project_key in stack_key.iter() { - self.stacks_by_project + let stack_keys = self + .stacks_by_project .get_mut(&project_key) - .expect("project_key is missing from lookup") - .remove(&stack_key); + .expect("project_key is missing from lookup"); + + // If there is only one stack key, we can directly remove the entry to save some memory. + if stack_keys.len() == 1 { + self.stacks_by_project.remove(&project_key); + } else { + stack_keys.remove(&stack_key); + }; } self.priority_queue.remove(&stack_key); @@ -320,6 +408,7 @@ impl StackKey { struct QueueItem { key: K, value: V, + last_update: Instant, } impl std::borrow::Borrow for QueueItem { @@ -398,7 +487,7 @@ impl Ord for Priority { } } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] struct Readiness { own_project_ready: bool, sampling_project_ready: bool, @@ -432,13 +521,13 @@ mod tests { use super::*; fn new_envelope( - project_key: ProjectKey, + own_key: ProjectKey, sampling_key: Option, event_id: Option, ) -> Box { let mut envelope = Envelope::from_request( None, - RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()), + RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()), ); if let Some(sampling_key) = sampling_key { envelope.set_dsc(DynamicSamplingContext { @@ -463,7 +552,7 @@ mod tests { #[tokio::test] async fn test_insert_pop() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(10); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); @@ -547,7 +636,7 @@ mod tests { #[tokio::test] async fn test_project_internal_order() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(10); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -574,7 +663,7 @@ mod tests { #[tokio::test] async fn test_sampling_projects() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(10); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); @@ -652,7 +741,7 @@ mod tests { assert_ne!(stack_key1, stack_key2); - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(10); buffer .push(new_envelope(project_key1, Some(project_key2), None)) .await @@ -666,7 +755,7 @@ mod tests { #[tokio::test] async fn test_last_peek_internal_order() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(10); let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let event_id_1 = EventId::new(); @@ -693,4 +782,28 @@ mod tests { event_id_1 ); } + + #[tokio::test] + async fn test_eviction() { + let mut buffer = EnvelopeBuffer::::new(3); + + let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let project_key_3 = ProjectKey::parse("e23ae32be2584e0bbd7a4cbb95971fed").unwrap(); + + let envelopes = [ + new_envelope(project_key_1, Some(project_key_2), None), + new_envelope(project_key_2, Some(project_key_1), None), + new_envelope(project_key_1, Some(project_key_3), None), + new_envelope(project_key_3, Some(project_key_1), None), + ]; + + for envelope in envelopes { + buffer.push(envelope).await.unwrap(); + } + + buffer.evict(); + + assert_eq!(buffer.priority_queue.len(), 1); + } } diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 9ea8d16a66..2888fe1a5a 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -7,7 +7,7 @@ use relay_base_schema::project::ProjectKey; use crate::envelope::Envelope; use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::sqlite_envelope_store::{ - SqliteEnvelopeStore, SqliteEnvelopeStoreError, + EnvelopesOrder, SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; use crate::statsd::RelayCounters; @@ -31,6 +31,8 @@ pub struct SqliteEnvelopeStack { spool_threshold: NonZeroUsize, /// Size of a batch of envelopes that is written to disk. batch_size: NonZeroUsize, + /// Maximum number of envelopes that can be evicted. + max_evictable_envelopes: NonZeroUsize, /// The project key of the project to which all the envelopes belong. own_key: ProjectKey, /// The project key of the root project of the trace to which all the envelopes belong. @@ -60,6 +62,9 @@ impl SqliteEnvelopeStack { .expect("the spool threshold must be > 0"), batch_size: NonZeroUsize::new(disk_batch_size) .expect("the disk batch size must be > 0"), + // TODO: add configurable parameter. + max_evictable_envelopes: NonZeroUsize::new(100) + .expect("the max evictable envelopes must be > 0"), own_key, sampling_key, batches_buffer: VecDeque::with_capacity(max_batches), @@ -125,6 +130,7 @@ impl SqliteEnvelopeStack { self.own_key, self.sampling_key, self.batch_size.get() as i64, + EnvelopesOrder::MostRecent, ) .await .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?; @@ -228,6 +234,30 @@ impl EnvelopeStack for SqliteEnvelopeStack { } } +impl Drop for SqliteEnvelopeStack { + fn drop(&mut self) { + let own_key = self.own_key; + let sampling_key = self.sampling_key; + let max_evictable_envelopes = self.max_evictable_envelopes; + let envelope_store = self.envelope_store.clone(); + + tokio::spawn(async move { + if envelope_store + .delete_many( + own_key, + sampling_key, + max_evictable_envelopes.get() as i64, + EnvelopesOrder::Oldest, + ) + .await + .is_err() + { + relay_log::error!("failed to evict envelopes from disk"); + }; + }); + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; diff --git a/relay-server/src/services/buffer/sqlite_envelope_store.rs b/relay-server/src/services/buffer/sqlite_envelope_store.rs index 83616bf55a..45faec9b91 100644 --- a/relay-server/src/services/buffer/sqlite_envelope_store.rs +++ b/relay-server/src/services/buffer/sqlite_envelope_store.rs @@ -88,6 +88,14 @@ pub enum SqliteEnvelopeStoreError { FileSizeReadFailed(sqlx::Error), } +/// Enum representing the order in which [`Envelope`]s are fetched or deleted from the +/// database. +#[derive(Debug, Copy, Clone)] +pub enum EnvelopesOrder { + MostRecent, + Oldest, +} + /// Struct that offers access to a SQLite-based store of [`Envelope`]s. /// /// The goal of this struct is to hide away all the complexity of dealing with the database for @@ -221,10 +229,17 @@ impl SqliteEnvelopeStore { own_key: ProjectKey, sampling_key: ProjectKey, limit: i64, + envelopes_order: EnvelopesOrder, ) -> Result>, SqliteEnvelopeStoreError> { - let envelopes = build_delete_and_fetch_many_envelopes(own_key, sampling_key, limit) - .fetch(&self.db) - .peekable(); + let query = match envelopes_order { + EnvelopesOrder::MostRecent => { + build_delete_and_fetch_many_recent_envelopes(own_key, sampling_key, limit) + } + EnvelopesOrder::Oldest => { + build_delete_and_fetch_many_old_envelopes(own_key, sampling_key, limit) + } + }; + let envelopes = query.fetch(&self.db).peekable(); let mut envelopes = pin!(envelopes); if envelopes.as_mut().peek().await.is_none() { @@ -370,8 +385,8 @@ fn build_insert_many_envelopes<'a>( builder } -/// Builds a query that deletes many [`Envelope`] from the database. -pub fn build_delete_and_fetch_many_envelopes<'a>( +/// Builds a query that deletes many new [`Envelope`]s from the database. +pub fn build_delete_and_fetch_many_recent_envelopes<'a>( own_key: ProjectKey, project_key: ProjectKey, batch_size: i64, @@ -389,6 +404,25 @@ pub fn build_delete_and_fetch_many_envelopes<'a>( .bind(batch_size) } +/// Builds a query that deletes many old [`Envelope`]s from the database. +pub fn build_delete_and_fetch_many_old_envelopes<'a>( + own_key: ProjectKey, + project_key: ProjectKey, + batch_size: i64, +) -> Query<'a, Sqlite, SqliteArguments<'a>> { + sqlx::query( + "DELETE FROM + envelopes + WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? + ORDER BY received_at ASC LIMIT ?) + RETURNING + received_at, own_key, sampling_key, envelope", + ) + .bind(own_key.to_string()) + .bind(project_key.to_string()) + .bind(batch_size) +} + /// Creates a query which fetches the number of used database pages multiplied by the page size. /// /// This info used to estimate the current allocated database size. @@ -480,7 +514,7 @@ mod tests { // We check that if we load more than the limit, we still get back at most 10. let extracted_envelopes = envelope_store - .delete_many(own_key, sampling_key, 15) + .delete_many(own_key, sampling_key, 15, EnvelopesOrder::MostRecent) .await .unwrap(); assert_eq!(envelopes.len(), 10); From d52ef708bcf3809b2ba29e0eb18bc1590250b3b6 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 14 Aug 2024 17:38:20 +0200 Subject: [PATCH 2/9] Improve --- .../services/buffer/envelope_buffer/mod.rs | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index a73e04fd12..61343b7c82 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -282,6 +282,7 @@ where /// Evicts the least recently used stacks. pub fn evict(&mut self) { + #[derive(Debug, Copy, Clone)] struct LRUItem(StackKey, Readiness, Instant); impl PartialEq for LRUItem { @@ -303,24 +304,28 @@ where match (self.1.ready(), other.1.ready()) { (true, false) => Ordering::Greater, (false, true) => Ordering::Less, - _ => self.2.cmp(&other.2), + _ => self.2.cmp(&other.2).reverse(), } } } let mut lru: BinaryHeap = BinaryHeap::new(); for (queue_item, priority) in self.priority_queue.iter() { + let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update); + // If we exceed the size, we want to pop the greatest element, so that we end up with // the smallest elements which are the ones with the lowest priority. if lru.len() >= self.max_evictable_stacks { - lru.pop(); + let Some(top_lru_item) = lru.peek() else { + continue; + }; + + if lru_item < *top_lru_item { + lru.pop(); + } } - lru.push(LRUItem( - queue_item.key, - priority.readiness, - queue_item.last_update, - )); + lru.push(lru_item); } // We go over each element and remove it from the stack. The removal will call the `drop` @@ -509,6 +514,8 @@ impl Readiness { #[cfg(test)] mod tests { use std::str::FromStr; + use std::time::Duration; + use tokio::time::sleep; use uuid::Uuid; use relay_common::Dsn; @@ -798,12 +805,20 @@ mod tests { new_envelope(project_key_3, Some(project_key_1), None), ]; - for envelope in envelopes { + for envelope in envelopes.clone() { buffer.push(envelope).await.unwrap(); + sleep(Duration::from_millis(1)).await; } + buffer.mark_ready(&project_key_1, true); + buffer.mark_ready(&project_key_2, true); + buffer.evict(); assert_eq!(buffer.priority_queue.len(), 1); + assert_eq!( + buffer.peek().await.unwrap().unwrap().event_id(), + envelopes[0].event_id() + ); } } From 3a9d822b99ecab3d84b0bd94994a4b1939170ac4 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 14 Aug 2024 17:50:21 +0200 Subject: [PATCH 3/9] Improve --- .../src/services/buffer/envelope_buffer/mod.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 61343b7c82..c92e8da751 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -304,7 +304,7 @@ where match (self.1.ready(), other.1.ready()) { (true, false) => Ordering::Greater, (false, true) => Ordering::Less, - _ => self.2.cmp(&other.2).reverse(), + _ => self.2.cmp(&other.2), } } } @@ -799,10 +799,10 @@ mod tests { let project_key_3 = ProjectKey::parse("e23ae32be2584e0bbd7a4cbb95971fed").unwrap(); let envelopes = [ - new_envelope(project_key_1, Some(project_key_2), None), - new_envelope(project_key_2, Some(project_key_1), None), - new_envelope(project_key_1, Some(project_key_3), None), - new_envelope(project_key_3, Some(project_key_1), None), + new_envelope(project_key_1, Some(project_key_2), Some(EventId::new())), + new_envelope(project_key_2, Some(project_key_1), Some(EventId::new())), + new_envelope(project_key_1, Some(project_key_3), Some(EventId::new())), + new_envelope(project_key_3, Some(project_key_1), Some(EventId::new())), ]; for envelope in envelopes.clone() { @@ -816,9 +816,11 @@ mod tests { buffer.evict(); assert_eq!(buffer.priority_queue.len(), 1); + // We expect that only the 2nd envelope is kept, since the last 2 have non-ready projects + // and the first one is the oldest of the ones that are ready. assert_eq!( buffer.peek().await.unwrap().unwrap().event_id(), - envelopes[0].event_id() + envelopes[1].event_id() ); } } From 5dde52e85674bcc1c35a2b5f1b58c542160fe1da Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 16 Aug 2024 09:47:34 +0200 Subject: [PATCH 4/9] Improve --- .../services/buffer/envelope_buffer/mod.rs | 27 +++++++----- .../services/buffer/envelope_stack/memory.rs | 11 +++-- .../src/services/buffer/envelope_stack/mod.rs | 8 +++- .../services/buffer/envelope_stack/sqlite.rs | 42 +++++++++---------- .../services/buffer/sqlite_envelope_store.rs | 5 +++ 5 files changed, 57 insertions(+), 36 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index c92e8da751..522bbd7acb 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -8,7 +8,7 @@ use relay_config::Config; use crate::envelope::Envelope; use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; -use crate::services::buffer::envelope_stack::{EnvelopeStack, StackProvider}; +use crate::services::buffer::envelope_stack::{EnvelopeStack, Evictable, StackProvider}; use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError; use crate::services::buffer::stack_provider::memory::MemoryStackProvider; use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; @@ -281,7 +281,8 @@ where } /// Evicts the least recently used stacks. - pub fn evict(&mut self) { + #[allow(dead_code)] + pub async fn evict(&mut self) { #[derive(Debug, Copy, Clone)] struct LRUItem(StackKey, Readiness, Instant); @@ -313,8 +314,9 @@ where for (queue_item, priority) in self.priority_queue.iter() { let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update); - // If we exceed the size, we want to pop the greatest element, so that we end up with - // the smallest elements which are the ones with the lowest priority. + // If we exceed the size, we want to pop the greatest element only if we have a smaller + // element, so that we end up with the smallest elements which are the ones with the + // lowest priority. if lru.len() >= self.max_evictable_stacks { let Some(top_lru_item) = lru.peek() else { continue; @@ -328,10 +330,12 @@ where lru.push(lru_item); } - // We go over each element and remove it from the stack. The removal will call the `drop` - // method of the `EnvelopeStack` which will have a specific behavior. + // We go over each element and remove it from the stack. After removal, we will evict + // elements from each popped stack. for lru_item in lru { - self.pop_stack(lru_item.0); + if let Some(mut stack) = self.pop_stack(lru_item.0) { + stack.evict().await; + } } } @@ -358,7 +362,7 @@ where ); } - fn pop_stack(&mut self, stack_key: StackKey) { + fn pop_stack(&mut self, stack_key: StackKey) -> Option { for project_key in stack_key.iter() { let stack_keys = self .stacks_by_project @@ -372,11 +376,14 @@ where stack_keys.remove(&stack_key); }; } - self.priority_queue.remove(&stack_key); + + let stack = self.priority_queue.remove(&stack_key); relay_statsd::metric!( gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 ); + + stack.map(|(q, _)| q.value) } } @@ -813,7 +820,7 @@ mod tests { buffer.mark_ready(&project_key_1, true); buffer.mark_ready(&project_key_2, true); - buffer.evict(); + buffer.evict().await; assert_eq!(buffer.priority_queue.len(), 1); // We expect that only the 2nd envelope is kept, since the last 2 have non-ready projects diff --git a/relay-server/src/services/buffer/envelope_stack/memory.rs b/relay-server/src/services/buffer/envelope_stack/memory.rs index 5e8087010f..bb1766a07b 100644 --- a/relay-server/src/services/buffer/envelope_stack/memory.rs +++ b/relay-server/src/services/buffer/envelope_stack/memory.rs @@ -1,8 +1,7 @@ -use std::convert::Infallible; - use crate::Envelope; +use std::convert::Infallible; -use super::EnvelopeStack; +use super::{EnvelopeStack, Evictable}; #[derive(Debug)] pub struct MemoryEnvelopeStack(#[allow(clippy::vec_box)] Vec>); @@ -29,3 +28,9 @@ impl EnvelopeStack for MemoryEnvelopeStack { Ok(self.0.pop()) } } + +impl Evictable for MemoryEnvelopeStack { + async fn evict(&mut self) { + self.0.clear() + } +} diff --git a/relay-server/src/services/buffer/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs index ee48016f09..6f390261fa 100644 --- a/relay-server/src/services/buffer/envelope_stack/mod.rs +++ b/relay-server/src/services/buffer/envelope_stack/mod.rs @@ -21,8 +21,14 @@ pub trait EnvelopeStack: Send + std::fmt::Debug { fn pop(&mut self) -> impl Future>, Self::Error>>; } +/// An object that can support eviction. +pub trait Evictable: Send + std::fmt::Debug { + /// Evicts data from an [`Evictable`]. + fn evict(&mut self) -> impl Future; +} + pub trait StackProvider: std::fmt::Debug { - type Stack: EnvelopeStack; + type Stack: EnvelopeStack + Evictable; fn create_stack(&self, envelope: Box) -> Self::Stack; } diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 2888fe1a5a..39729ebbeb 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -5,7 +5,7 @@ use std::num::NonZeroUsize; use relay_base_schema::project::ProjectKey; use crate::envelope::Envelope; -use crate::services::buffer::envelope_stack::EnvelopeStack; +use crate::services::buffer::envelope_stack::{EnvelopeStack, Evictable}; use crate::services::buffer::sqlite_envelope_store::{ EnvelopesOrder, SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; @@ -234,27 +234,25 @@ impl EnvelopeStack for SqliteEnvelopeStack { } } -impl Drop for SqliteEnvelopeStack { - fn drop(&mut self) { - let own_key = self.own_key; - let sampling_key = self.sampling_key; - let max_evictable_envelopes = self.max_evictable_envelopes; - let envelope_store = self.envelope_store.clone(); - - tokio::spawn(async move { - if envelope_store - .delete_many( - own_key, - sampling_key, - max_evictable_envelopes.get() as i64, - EnvelopesOrder::Oldest, - ) - .await - .is_err() - { - relay_log::error!("failed to evict envelopes from disk"); - }; - }); +impl Evictable for SqliteEnvelopeStack { + async fn evict(&mut self) { + // We want to evict all elements in memory. + self.batches_buffer.clear(); + self.batches_buffer_size = 0; + + if self + .envelope_store + .delete_many( + self.own_key, + self.sampling_key, + self.max_evictable_envelopes.get() as i64, + EnvelopesOrder::Oldest, + ) + .await + .is_err() + { + relay_log::error!("failed to evict envelopes from disk"); + }; } } diff --git a/relay-server/src/services/buffer/sqlite_envelope_store.rs b/relay-server/src/services/buffer/sqlite_envelope_store.rs index 45faec9b91..bbe9dc964b 100644 --- a/relay-server/src/services/buffer/sqlite_envelope_store.rs +++ b/relay-server/src/services/buffer/sqlite_envelope_store.rs @@ -547,4 +547,9 @@ mod tests { (own_key, sampling_key) ); } + + #[tokio::test] + async fn test_drop_envelope_stack() { + // TODO: test that dropping the envelope stack cleans up the db. + } } From 5d36a428eb04cd4f789b69d56dbb60d8d1b1cf5d Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 16 Aug 2024 10:34:36 +0200 Subject: [PATCH 5/9] Improve --- relay-config/src/config.rs | 15 +++++++ relay-server/benches/benches.rs | 3 ++ .../services/buffer/envelope_buffer/mod.rs | 1 + .../services/buffer/envelope_stack/sqlite.rs | 41 ++++++++++++++++++- .../services/buffer/sqlite_envelope_store.rs | 5 --- .../services/buffer/stack_provider/sqlite.rs | 3 ++ 6 files changed, 61 insertions(+), 7 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 88d9269398..503bd68e34 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -860,10 +860,16 @@ fn spool_envelopes_stack_max_batches() -> usize { 2 } +/// Default maximum time between receiving the envelope and processing it. fn spool_envelopes_max_envelope_delay_secs() -> u64 { 24 * 60 * 60 } +/// Default maximum number of envelopes that can be evicted for each stack. +fn spool_envelopes_max_evictable_envelopes() -> usize { + 100 +} + /// Persistent buffering configuration for incoming envelopes. #[derive(Debug, Serialize, Deserialize)] pub struct EnvelopeSpool { @@ -903,6 +909,9 @@ pub struct EnvelopeSpool { /// they are dropped. Defaults to 24h. #[serde(default = "spool_envelopes_max_envelope_delay_secs")] max_envelope_delay_secs: u64, + /// Maximum number of envelopes that can be evicted for each stack. + #[serde(default = "spool_envelopes_max_evictable_envelopes")] + max_evictable_envelopes: usize, /// Version of the spooler. #[serde(default)] version: EnvelopeSpoolVersion, @@ -938,6 +947,7 @@ impl Default for EnvelopeSpool { disk_batch_size: spool_envelopes_stack_disk_batch_size(), max_batches: spool_envelopes_stack_max_batches(), max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(), + max_evictable_envelopes: spool_envelopes_max_evictable_envelopes(), version: EnvelopeSpoolVersion::default(), } } @@ -2149,6 +2159,11 @@ impl Config { self.values.spool.envelopes.max_batches } + /// Maximum number of envelopes that can be evicted per stack. + pub fn spool_envelopes_stack_max_evictable_envelopes(&self) -> usize { + self.values.spool.envelopes.max_evictable_envelopes + } + /// Returns `true` if version 2 of the spooling mechanism is used. pub fn spool_v2(&self) -> bool { matches!( diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index ac6ff48257..8ef7fb9852 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -99,6 +99,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ); @@ -135,6 +136,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ); @@ -175,6 +177,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ); diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 522bbd7acb..88a47d56dc 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -814,6 +814,7 @@ mod tests { for envelope in envelopes.clone() { buffer.push(envelope).await.unwrap(); + // We sleep to make sure that the `last_update` of `QueueItem` is different. sleep(Duration::from_millis(1)).await; } diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 39729ebbeb..7a072740e3 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -53,6 +53,7 @@ impl SqliteEnvelopeStack { envelope_store: SqliteEnvelopeStore, disk_batch_size: usize, max_batches: usize, + max_evictable_envelopes: usize, own_key: ProjectKey, sampling_key: ProjectKey, ) -> Self { @@ -62,8 +63,7 @@ impl SqliteEnvelopeStack { .expect("the spool threshold must be > 0"), batch_size: NonZeroUsize::new(disk_batch_size) .expect("the disk batch size must be > 0"), - // TODO: add configurable parameter. - max_evictable_envelopes: NonZeroUsize::new(100) + max_evictable_envelopes: NonZeroUsize::new(max_evictable_envelopes) .expect("the max evictable envelopes must be > 0"), own_key, sampling_key, @@ -322,6 +322,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -338,6 +339,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -390,6 +392,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -409,6 +412,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -426,6 +430,7 @@ mod tests { envelope_store, 5, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -463,6 +468,7 @@ mod tests { envelope_store, 5, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -523,4 +529,35 @@ mod tests { } assert_eq!(stack.batches_buffer_size, 0); } + + #[tokio::test] + async fn test_evict() { + let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db); + let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); + let mut stack = + SqliteEnvelopeStack::new(envelope_store.clone(), 5, 1, 2, own_key, sampling_key); + + let envelopes = mock_envelopes(15); + + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + + stack.evict().await; + // We expect 0 in-memory data since we flushed the 5 in-memory envelopes. + assert!(stack.batches_buffer.is_empty()); + assert_eq!(stack.batches_buffer_size, 0); + // We expect 2 out of the 10 envelopes on disk to have been flushed, so if we load 15, we + // should get 8 back. + assert_eq!( + envelope_store + .delete_many(own_key, sampling_key, 15, EnvelopesOrder::MostRecent) + .await + .unwrap() + .len(), + 8 + ); + } } diff --git a/relay-server/src/services/buffer/sqlite_envelope_store.rs b/relay-server/src/services/buffer/sqlite_envelope_store.rs index bbe9dc964b..45faec9b91 100644 --- a/relay-server/src/services/buffer/sqlite_envelope_store.rs +++ b/relay-server/src/services/buffer/sqlite_envelope_store.rs @@ -547,9 +547,4 @@ mod tests { (own_key, sampling_key) ); } - - #[tokio::test] - async fn test_drop_envelope_stack() { - // TODO: test that dropping the envelope stack cleans up the db. - } } diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 9716585606..6f8c4d4b5d 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -11,6 +11,7 @@ pub struct SqliteStackProvider { envelope_store: SqliteEnvelopeStore, disk_batch_size: usize, max_batches: usize, + max_evictable_envelopes: usize, } #[warn(dead_code)] @@ -22,6 +23,7 @@ impl SqliteStackProvider { envelope_store, disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), max_batches: config.spool_envelopes_stack_max_batches(), + max_evictable_envelopes: config.spool_envelopes_stack_max_evictable_envelopes(), }) } } @@ -37,6 +39,7 @@ impl StackProvider for SqliteStackProvider { self.envelope_store.clone(), self.disk_batch_size, self.max_batches, + self.max_evictable_envelopes, own_key, sampling_key, ) From d436974b508787d8e92490f6634d3dc08af0b82b Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 16 Aug 2024 10:45:40 +0200 Subject: [PATCH 6/9] Improve --- relay-config/src/config.rs | 14 ++++++++++++++ .../src/services/buffer/envelope_buffer/mod.rs | 3 +-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 503bd68e34..9fc876086d 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -865,6 +865,11 @@ fn spool_envelopes_max_envelope_delay_secs() -> u64 { 24 * 60 * 60 } +/// Default maximum number of stacks that can be evicted. +fn spool_envelopes_max_evictable_stacks() -> usize { + 100 +} + /// Default maximum number of envelopes that can be evicted for each stack. fn spool_envelopes_max_evictable_envelopes() -> usize { 100 @@ -909,6 +914,9 @@ pub struct EnvelopeSpool { /// they are dropped. Defaults to 24h. #[serde(default = "spool_envelopes_max_envelope_delay_secs")] max_envelope_delay_secs: u64, + /// Maximum number of stacks that can be evicted. + #[serde(default = "spool_envelopes_max_evictable_stacks")] + max_evictable_stacks: usize, /// Maximum number of envelopes that can be evicted for each stack. #[serde(default = "spool_envelopes_max_evictable_envelopes")] max_evictable_envelopes: usize, @@ -947,6 +955,7 @@ impl Default for EnvelopeSpool { disk_batch_size: spool_envelopes_stack_disk_batch_size(), max_batches: spool_envelopes_stack_max_batches(), max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(), + max_evictable_stacks: spool_envelopes_max_evictable_stacks(), max_evictable_envelopes: spool_envelopes_max_evictable_envelopes(), version: EnvelopeSpoolVersion::default(), } @@ -2159,6 +2168,11 @@ impl Config { self.values.spool.envelopes.max_batches } + /// Maximum number of stacks that can be evicted. + pub fn spool_envelopes_stack_max_evictable_stacks(&self) -> usize { + self.values.spool.envelopes.max_evictable_stacks + } + /// Maximum number of envelopes that can be evicted per stack. pub fn spool_envelopes_stack_max_evictable_envelopes(&self) -> usize { self.values.spool.envelopes.max_evictable_envelopes diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 88a47d56dc..a3faf92063 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -130,8 +130,7 @@ impl EnvelopeBuffer { stacks_by_project: Default::default(), priority_queue: Default::default(), stack_provider: SqliteStackProvider::new(config).await?, - // TODO: add configuration. - max_evictable_stacks: 10, + max_evictable_stacks: config.spool_envelopes_stack_max_evictable_stacks(), }) } } From 4d1c06ffa2add8e38126dab2fb9d5ac863c0fc96 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 16 Aug 2024 10:48:49 +0200 Subject: [PATCH 7/9] Improve --- .../src/services/buffer/envelope_buffer/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index a3faf92063..d0513a7609 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -38,8 +38,8 @@ impl PolymorphicEnvelopeBuffer { if config.spool_envelopes_path().is_some() { panic!("Disk backend not yet supported for spool V2"); } - // TODO: use configuration. - Self::InMemory(EnvelopeBuffer::::new(100)) + + Self::InMemory(EnvelopeBuffer::::new(config)) } /// Adds an envelope to the buffer. @@ -72,7 +72,7 @@ impl PolymorphicEnvelopeBuffer { /// Marks a project as ready or not ready. /// - /// The buffer reprioritizes its envelopes based on this information. + /// The buffer re-prioritizes its envelopes based on this information. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { match self { Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready), @@ -93,7 +93,7 @@ pub enum EnvelopeBufferError { /// An envelope buffer that holds an individual stack for each project/sampling project combination. /// -/// Envelope stacks are organized in a priority queue, and are reprioritized every time an envelope +/// Envelope stacks are organized in a priority queue, and are re-prioritized every time an envelope /// is pushed, popped, or when a project becomes ready. #[derive(Debug)] struct EnvelopeBuffer { @@ -112,12 +112,12 @@ struct EnvelopeBuffer { impl EnvelopeBuffer { /// Creates an empty buffer. - pub fn new(max_evictable_stacks: usize) -> Self { + pub fn new(config: &Config) -> Self { Self { stacks_by_project: Default::default(), priority_queue: Default::default(), stack_provider: MemoryStackProvider, - max_evictable_stacks, + max_evictable_stacks: config.spool_envelopes_stack_max_evictable_stacks(), } } } From bc3b2fd210864bf164c629bae6e1a583c1390d70 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 16 Aug 2024 11:00:21 +0200 Subject: [PATCH 8/9] Improve --- .../services/buffer/envelope_buffer/mod.rs | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index d0513a7609..4bbeb880c0 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -563,9 +563,21 @@ mod tests { envelope } + fn mock_config() -> Config { + Config::from_json_value(serde_json::json!({ + "spool": { + "envelopes": { + "version": "experimental", + "max_evictable_stacks": 3 + } + } + })) + .unwrap() + } + #[tokio::test] async fn test_insert_pop() { - let mut buffer = EnvelopeBuffer::::new(10); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); @@ -649,7 +661,7 @@ mod tests { #[tokio::test] async fn test_project_internal_order() { - let mut buffer = EnvelopeBuffer::::new(10); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -676,7 +688,7 @@ mod tests { #[tokio::test] async fn test_sampling_projects() { - let mut buffer = EnvelopeBuffer::::new(10); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); @@ -754,7 +766,7 @@ mod tests { assert_ne!(stack_key1, stack_key2); - let mut buffer = EnvelopeBuffer::::new(10); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); buffer .push(new_envelope(project_key1, Some(project_key2), None)) .await @@ -768,7 +780,7 @@ mod tests { #[tokio::test] async fn test_last_peek_internal_order() { - let mut buffer = EnvelopeBuffer::::new(10); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let event_id_1 = EventId::new(); @@ -798,7 +810,7 @@ mod tests { #[tokio::test] async fn test_eviction() { - let mut buffer = EnvelopeBuffer::::new(3); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap(); From 68a10b897892197e5fbb8915d9cd58b660f4ee00 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 26 Aug 2024 14:52:45 +0200 Subject: [PATCH 9/9] Add metric --- .../services/buffer/envelope_buffer/mod.rs | 46 ++++++++++--------- relay-server/src/statsd.rs | 6 +++ 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 4bbeb880c0..c3d69e7bdd 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -12,7 +12,7 @@ use crate::services::buffer::envelope_stack::{EnvelopeStack, Evictable, StackPro use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError; use crate::services::buffer::stack_provider::memory::MemoryStackProvider; use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; -use crate::statsd::{RelayCounters, RelayGauges}; +use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; /// Polymorphic envelope buffering interface. /// @@ -310,32 +310,36 @@ where } let mut lru: BinaryHeap = BinaryHeap::new(); - for (queue_item, priority) in self.priority_queue.iter() { - let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update); - - // If we exceed the size, we want to pop the greatest element only if we have a smaller - // element, so that we end up with the smallest elements which are the ones with the - // lowest priority. - if lru.len() >= self.max_evictable_stacks { - let Some(top_lru_item) = lru.peek() else { - continue; - }; - - if lru_item < *top_lru_item { - lru.pop(); + relay_statsd::metric!(timer(RelayTimers::BufferEvictLRUConstruction), { + for (queue_item, priority) in self.priority_queue.iter() { + let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update); + + // If we exceed the size, we want to pop the greatest element only if we have a smaller + // element, so that we end up with the smallest elements which are the ones with the + // lowest priority. + if lru.len() >= self.max_evictable_stacks { + let Some(top_lru_item) = lru.peek() else { + continue; + }; + + if lru_item < *top_lru_item { + lru.pop(); + } } - } - lru.push(lru_item); - } + lru.push(lru_item); + } + }); // We go over each element and remove it from the stack. After removal, we will evict // elements from each popped stack. - for lru_item in lru { - if let Some(mut stack) = self.pop_stack(lru_item.0) { - stack.evict().await; + relay_statsd::metric!(timer(RelayTimers::BufferEvictStacksEviction), { + for lru_item in lru { + if let Some(mut stack) = self.pop_stack(lru_item.0) { + stack.evict().await; + } } - } + }); } fn push_stack(&mut self, envelope: Box) { diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 1113d528fc..88e7cc52b0 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -514,6 +514,10 @@ pub enum RelayTimers { /// - `message`: The type of message that was processed. #[cfg(feature = "processing")] StoreServiceDuration, + /// Timing in milliseconds for constructing the LRU list of stacks. + BufferEvictLRUConstruction, + /// Timing in milliseconds to evict all the stacks determined by the LRU algorithm. + BufferEvictStacksEviction, } impl TimerMetric for RelayTimers { @@ -555,6 +559,8 @@ impl TimerMetric for RelayTimers { RelayTimers::MetricRouterServiceDuration => "metrics.router.message.duration", #[cfg(feature = "processing")] RelayTimers::StoreServiceDuration => "store.message.duration", + RelayTimers::BufferEvictLRUConstruction => "buffer.evict.lru_construction", + RelayTimers::BufferEvictStacksEviction => "buffer.evict.stacks_eviction", } } }