diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 88d9269398..9fc876086d 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -860,10 +860,21 @@ fn spool_envelopes_stack_max_batches() -> usize { 2 } +/// Default maximum time between receiving the envelope and processing it. fn spool_envelopes_max_envelope_delay_secs() -> u64 { 24 * 60 * 60 } +/// Default maximum number of stacks that can be evicted. +fn spool_envelopes_max_evictable_stacks() -> usize { + 100 +} + +/// Default maximum number of envelopes that can be evicted for each stack. +fn spool_envelopes_max_evictable_envelopes() -> usize { + 100 +} + /// Persistent buffering configuration for incoming envelopes. #[derive(Debug, Serialize, Deserialize)] pub struct EnvelopeSpool { @@ -903,6 +914,12 @@ pub struct EnvelopeSpool { /// they are dropped. Defaults to 24h. #[serde(default = "spool_envelopes_max_envelope_delay_secs")] max_envelope_delay_secs: u64, + /// Maximum number of stacks that can be evicted. + #[serde(default = "spool_envelopes_max_evictable_stacks")] + max_evictable_stacks: usize, + /// Maximum number of envelopes that can be evicted for each stack. + #[serde(default = "spool_envelopes_max_evictable_envelopes")] + max_evictable_envelopes: usize, /// Version of the spooler. #[serde(default)] version: EnvelopeSpoolVersion, @@ -938,6 +955,8 @@ impl Default for EnvelopeSpool { disk_batch_size: spool_envelopes_stack_disk_batch_size(), max_batches: spool_envelopes_stack_max_batches(), max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(), + max_evictable_stacks: spool_envelopes_max_evictable_stacks(), + max_evictable_envelopes: spool_envelopes_max_evictable_envelopes(), version: EnvelopeSpoolVersion::default(), } } @@ -2149,6 +2168,16 @@ impl Config { self.values.spool.envelopes.max_batches } + /// Maximum number of stacks that can be evicted. + pub fn spool_envelopes_stack_max_evictable_stacks(&self) -> usize { + self.values.spool.envelopes.max_evictable_stacks + } + + /// Maximum number of envelopes that can be evicted per stack. + pub fn spool_envelopes_stack_max_evictable_envelopes(&self) -> usize { + self.values.spool.envelopes.max_evictable_envelopes + } + /// Returns `true` if version 2 of the spooling mechanism is used. pub fn spool_v2(&self) -> bool { matches!( diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index ac6ff48257..8ef7fb9852 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -99,6 +99,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ); @@ -135,6 +136,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ); @@ -175,6 +177,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ); diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 99e50a09ba..c3d69e7bdd 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -1,5 +1,5 @@ use std::cmp::Ordering; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, BinaryHeap}; use std::convert::Infallible; use std::time::Instant; @@ -8,11 +8,11 @@ use relay_config::Config; use crate::envelope::Envelope; use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; -use crate::services::buffer::envelope_stack::{EnvelopeStack, StackProvider}; +use crate::services::buffer::envelope_stack::{EnvelopeStack, Evictable, StackProvider}; use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError; use crate::services::buffer::stack_provider::memory::MemoryStackProvider; use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; -use crate::statsd::{RelayCounters, RelayGauges}; +use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; /// Polymorphic envelope buffering interface. /// @@ -38,7 +38,8 @@ impl PolymorphicEnvelopeBuffer { if config.spool_envelopes_path().is_some() { panic!("Disk backend not yet supported for spool V2"); } - Self::InMemory(EnvelopeBuffer::::new()) + + Self::InMemory(EnvelopeBuffer::::new(config)) } /// Adds an envelope to the buffer. @@ -71,7 +72,7 @@ impl PolymorphicEnvelopeBuffer { /// Marks a project as ready or not ready. /// - /// The buffer reprioritizes its envelopes based on this information. + /// The buffer re-prioritizes its envelopes based on this information. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { match self { Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready), @@ -92,7 +93,7 @@ pub enum EnvelopeBufferError { /// An envelope buffer that holds an individual stack for each project/sampling project combination. /// -/// Envelope stacks are organized in a priority queue, and are reprioritized every time an envelope +/// Envelope stacks are organized in a priority queue, and are re-prioritized every time an envelope /// is pushed, popped, or when a project becomes ready. #[derive(Debug)] struct EnvelopeBuffer { @@ -105,15 +106,18 @@ struct EnvelopeBuffer { /// This indirection is needed because different stack implementations might need different /// initialization (e.g. a database connection). stack_provider: P, + /// The maximum number of stacks that can be evicted when low on memory. + max_evictable_stacks: usize, } impl EnvelopeBuffer { /// Creates an empty buffer. - pub fn new() -> Self { + pub fn new(config: &Config) -> Self { Self { stacks_by_project: Default::default(), priority_queue: Default::default(), stack_provider: MemoryStackProvider, + max_evictable_stacks: config.spool_envelopes_stack_max_evictable_stacks(), } } } @@ -126,15 +130,16 @@ impl EnvelopeBuffer { stacks_by_project: Default::default(), priority_queue: Default::default(), stack_provider: SqliteStackProvider::new(config).await?, + max_evictable_stacks: config.spool_envelopes_stack_max_evictable_stacks(), }) } } impl EnvelopeBuffer

where - EnvelopeBufferError: std::convert::From<::Error>, + EnvelopeBufferError: From<::Error>, { - /// Pushes an envelope to the appropriate envelope stack and reprioritizes the stack. + /// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack. /// /// If the envelope stack does not exist, a new stack is pushed to the priority queue. /// The priority of the stack is updated with the envelope's received_at time. @@ -145,10 +150,12 @@ where QueueItem { key: _, value: stack, + last_update, }, _, )) = self.priority_queue.get_mut(&stack_key) { + *last_update = Instant::now(); stack.push(envelope).await?; } else { self.push_stack(envelope); @@ -166,13 +173,13 @@ where QueueItem { key: stack_key, value: _, + last_update: _, }, _, )) = self.priority_queue.peek() else { return Ok(None); }; - let stack_key = *stack_key; self.priority_queue.change_priority_by(&stack_key, |prio| { @@ -183,6 +190,7 @@ where QueueItem { key: _, value: stack, + last_update: _, }, _, )) = self.priority_queue.get_mut(&stack_key) @@ -198,16 +206,25 @@ where /// The priority of the envelope's stack is updated with the next envelope's received_at /// time. If the stack is empty after popping, it is removed from the priority queue. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { - let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else { + let Some(( + QueueItem { + key: stack_key, + value: stack, + last_update: _, + }, + _, + )) = self.priority_queue.peek_mut() + else { return Ok(None); }; - let stack_key = *key; + let stack_key = *stack_key; let envelope = stack.pop().await.unwrap().expect("found an empty stack"); let next_received_at = stack .peek() .await? .map(|next_envelope| next_envelope.meta().start_time()); + match next_received_at { None => { self.pop_stack(stack_key); @@ -218,14 +235,26 @@ where }); } } + Ok(Some(envelope)) } - /// Reprioritizes all stacks that involve the given project key by setting it to "ready". + /// Re-prioritizes all stacks that involve the given project key by setting it to "ready". pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; if let Some(stack_keys) = self.stacks_by_project.get(project) { for stack_key in stack_keys { + if let Some(( + QueueItem { + key: _, + value: _, + last_update, + }, + _, + )) = self.priority_queue.get_mut(stack_key) + { + *last_update = Instant::now(); + }; self.priority_queue.change_priority_by(stack_key, |stack| { let mut found = false; for (subkey, readiness) in [ @@ -250,6 +279,69 @@ where changed } + /// Evicts the least recently used stacks. + #[allow(dead_code)] + pub async fn evict(&mut self) { + #[derive(Debug, Copy, Clone)] + struct LRUItem(StackKey, Readiness, Instant); + + impl PartialEq for LRUItem { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } + } + + impl PartialOrd for LRUItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Eq for LRUItem {} + + impl Ord for LRUItem { + fn cmp(&self, other: &Self) -> Ordering { + match (self.1.ready(), other.1.ready()) { + (true, false) => Ordering::Greater, + (false, true) => Ordering::Less, + _ => self.2.cmp(&other.2), + } + } + } + + let mut lru: BinaryHeap = BinaryHeap::new(); + relay_statsd::metric!(timer(RelayTimers::BufferEvictLRUConstruction), { + for (queue_item, priority) in self.priority_queue.iter() { + let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update); + + // If we exceed the size, we want to pop the greatest element only if we have a smaller + // element, so that we end up with the smallest elements which are the ones with the + // lowest priority. + if lru.len() >= self.max_evictable_stacks { + let Some(top_lru_item) = lru.peek() else { + continue; + }; + + if lru_item < *top_lru_item { + lru.pop(); + } + } + + lru.push(lru_item); + } + }); + + // We go over each element and remove it from the stack. After removal, we will evict + // elements from each popped stack. + relay_statsd::metric!(timer(RelayTimers::BufferEvictStacksEviction), { + for lru_item in lru { + if let Some(mut stack) = self.pop_stack(lru_item.0) { + stack.evict().await; + } + } + }); + } + fn push_stack(&mut self, envelope: Box) { let received_at = envelope.meta().start_time(); let stack_key = StackKey::from_envelope(&envelope); @@ -257,6 +349,7 @@ where QueueItem { key: stack_key, value: self.stack_provider.create_stack(envelope), + last_update: Instant::now(), }, Priority::new(received_at), ); @@ -272,18 +365,28 @@ where ); } - fn pop_stack(&mut self, stack_key: StackKey) { + fn pop_stack(&mut self, stack_key: StackKey) -> Option { for project_key in stack_key.iter() { - self.stacks_by_project + let stack_keys = self + .stacks_by_project .get_mut(&project_key) - .expect("project_key is missing from lookup") - .remove(&stack_key); + .expect("project_key is missing from lookup"); + + // If there is only one stack key, we can directly remove the entry to save some memory. + if stack_keys.len() == 1 { + self.stacks_by_project.remove(&project_key); + } else { + stack_keys.remove(&stack_key); + }; } - self.priority_queue.remove(&stack_key); + + let stack = self.priority_queue.remove(&stack_key); relay_statsd::metric!( gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 ); + + stack.map(|(q, _)| q.value) } } @@ -320,6 +423,7 @@ impl StackKey { struct QueueItem { key: K, value: V, + last_update: Instant, } impl std::borrow::Borrow for QueueItem { @@ -398,7 +502,7 @@ impl Ord for Priority { } } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] struct Readiness { own_project_ready: bool, sampling_project_ready: bool, @@ -420,6 +524,8 @@ impl Readiness { #[cfg(test)] mod tests { use std::str::FromStr; + use std::time::Duration; + use tokio::time::sleep; use uuid::Uuid; use relay_common::Dsn; @@ -432,13 +538,13 @@ mod tests { use super::*; fn new_envelope( - project_key: ProjectKey, + own_key: ProjectKey, sampling_key: Option, event_id: Option, ) -> Box { let mut envelope = Envelope::from_request( None, - RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()), + RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()), ); if let Some(sampling_key) = sampling_key { envelope.set_dsc(DynamicSamplingContext { @@ -461,9 +567,21 @@ mod tests { envelope } + fn mock_config() -> Config { + Config::from_json_value(serde_json::json!({ + "spool": { + "envelopes": { + "version": "experimental", + "max_evictable_stacks": 3 + } + } + })) + .unwrap() + } + #[tokio::test] async fn test_insert_pop() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); @@ -547,7 +665,7 @@ mod tests { #[tokio::test] async fn test_project_internal_order() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -574,7 +692,7 @@ mod tests { #[tokio::test] async fn test_sampling_projects() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); @@ -652,7 +770,7 @@ mod tests { assert_ne!(stack_key1, stack_key2); - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); buffer .push(new_envelope(project_key1, Some(project_key2), None)) .await @@ -666,7 +784,7 @@ mod tests { #[tokio::test] async fn test_last_peek_internal_order() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(&mock_config()); let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let event_id_1 = EventId::new(); @@ -693,4 +811,39 @@ mod tests { event_id_1 ); } + + #[tokio::test] + async fn test_eviction() { + let mut buffer = EnvelopeBuffer::::new(&mock_config()); + + let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let project_key_3 = ProjectKey::parse("e23ae32be2584e0bbd7a4cbb95971fed").unwrap(); + + let envelopes = [ + new_envelope(project_key_1, Some(project_key_2), Some(EventId::new())), + new_envelope(project_key_2, Some(project_key_1), Some(EventId::new())), + new_envelope(project_key_1, Some(project_key_3), Some(EventId::new())), + new_envelope(project_key_3, Some(project_key_1), Some(EventId::new())), + ]; + + for envelope in envelopes.clone() { + buffer.push(envelope).await.unwrap(); + // We sleep to make sure that the `last_update` of `QueueItem` is different. + sleep(Duration::from_millis(1)).await; + } + + buffer.mark_ready(&project_key_1, true); + buffer.mark_ready(&project_key_2, true); + + buffer.evict().await; + + assert_eq!(buffer.priority_queue.len(), 1); + // We expect that only the 2nd envelope is kept, since the last 2 have non-ready projects + // and the first one is the oldest of the ones that are ready. + assert_eq!( + buffer.peek().await.unwrap().unwrap().event_id(), + envelopes[1].event_id() + ); + } } diff --git a/relay-server/src/services/buffer/envelope_stack/memory.rs b/relay-server/src/services/buffer/envelope_stack/memory.rs index 5e8087010f..bb1766a07b 100644 --- a/relay-server/src/services/buffer/envelope_stack/memory.rs +++ b/relay-server/src/services/buffer/envelope_stack/memory.rs @@ -1,8 +1,7 @@ -use std::convert::Infallible; - use crate::Envelope; +use std::convert::Infallible; -use super::EnvelopeStack; +use super::{EnvelopeStack, Evictable}; #[derive(Debug)] pub struct MemoryEnvelopeStack(#[allow(clippy::vec_box)] Vec>); @@ -29,3 +28,9 @@ impl EnvelopeStack for MemoryEnvelopeStack { Ok(self.0.pop()) } } + +impl Evictable for MemoryEnvelopeStack { + async fn evict(&mut self) { + self.0.clear() + } +} diff --git a/relay-server/src/services/buffer/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs index ee48016f09..6f390261fa 100644 --- a/relay-server/src/services/buffer/envelope_stack/mod.rs +++ b/relay-server/src/services/buffer/envelope_stack/mod.rs @@ -21,8 +21,14 @@ pub trait EnvelopeStack: Send + std::fmt::Debug { fn pop(&mut self) -> impl Future>, Self::Error>>; } +/// An object that can support eviction. +pub trait Evictable: Send + std::fmt::Debug { + /// Evicts data from an [`Evictable`]. + fn evict(&mut self) -> impl Future; +} + pub trait StackProvider: std::fmt::Debug { - type Stack: EnvelopeStack; + type Stack: EnvelopeStack + Evictable; fn create_stack(&self, envelope: Box) -> Self::Stack; } diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 9ea8d16a66..7a072740e3 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -5,9 +5,9 @@ use std::num::NonZeroUsize; use relay_base_schema::project::ProjectKey; use crate::envelope::Envelope; -use crate::services::buffer::envelope_stack::EnvelopeStack; +use crate::services::buffer::envelope_stack::{EnvelopeStack, Evictable}; use crate::services::buffer::sqlite_envelope_store::{ - SqliteEnvelopeStore, SqliteEnvelopeStoreError, + EnvelopesOrder, SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; use crate::statsd::RelayCounters; @@ -31,6 +31,8 @@ pub struct SqliteEnvelopeStack { spool_threshold: NonZeroUsize, /// Size of a batch of envelopes that is written to disk. batch_size: NonZeroUsize, + /// Maximum number of envelopes that can be evicted. + max_evictable_envelopes: NonZeroUsize, /// The project key of the project to which all the envelopes belong. own_key: ProjectKey, /// The project key of the root project of the trace to which all the envelopes belong. @@ -51,6 +53,7 @@ impl SqliteEnvelopeStack { envelope_store: SqliteEnvelopeStore, disk_batch_size: usize, max_batches: usize, + max_evictable_envelopes: usize, own_key: ProjectKey, sampling_key: ProjectKey, ) -> Self { @@ -60,6 +63,8 @@ impl SqliteEnvelopeStack { .expect("the spool threshold must be > 0"), batch_size: NonZeroUsize::new(disk_batch_size) .expect("the disk batch size must be > 0"), + max_evictable_envelopes: NonZeroUsize::new(max_evictable_envelopes) + .expect("the max evictable envelopes must be > 0"), own_key, sampling_key, batches_buffer: VecDeque::with_capacity(max_batches), @@ -125,6 +130,7 @@ impl SqliteEnvelopeStack { self.own_key, self.sampling_key, self.batch_size.get() as i64, + EnvelopesOrder::MostRecent, ) .await .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?; @@ -228,6 +234,28 @@ impl EnvelopeStack for SqliteEnvelopeStack { } } +impl Evictable for SqliteEnvelopeStack { + async fn evict(&mut self) { + // We want to evict all elements in memory. + self.batches_buffer.clear(); + self.batches_buffer_size = 0; + + if self + .envelope_store + .delete_many( + self.own_key, + self.sampling_key, + self.max_evictable_envelopes.get() as i64, + EnvelopesOrder::Oldest, + ) + .await + .is_err() + { + relay_log::error!("failed to evict envelopes from disk"); + }; + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -294,6 +322,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -310,6 +339,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -362,6 +392,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -381,6 +412,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -398,6 +430,7 @@ mod tests { envelope_store, 5, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -435,6 +468,7 @@ mod tests { envelope_store, 5, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -495,4 +529,35 @@ mod tests { } assert_eq!(stack.batches_buffer_size, 0); } + + #[tokio::test] + async fn test_evict() { + let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db); + let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); + let mut stack = + SqliteEnvelopeStack::new(envelope_store.clone(), 5, 1, 2, own_key, sampling_key); + + let envelopes = mock_envelopes(15); + + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + + stack.evict().await; + // We expect 0 in-memory data since we flushed the 5 in-memory envelopes. + assert!(stack.batches_buffer.is_empty()); + assert_eq!(stack.batches_buffer_size, 0); + // We expect 2 out of the 10 envelopes on disk to have been flushed, so if we load 15, we + // should get 8 back. + assert_eq!( + envelope_store + .delete_many(own_key, sampling_key, 15, EnvelopesOrder::MostRecent) + .await + .unwrap() + .len(), + 8 + ); + } } diff --git a/relay-server/src/services/buffer/sqlite_envelope_store.rs b/relay-server/src/services/buffer/sqlite_envelope_store.rs index 83616bf55a..45faec9b91 100644 --- a/relay-server/src/services/buffer/sqlite_envelope_store.rs +++ b/relay-server/src/services/buffer/sqlite_envelope_store.rs @@ -88,6 +88,14 @@ pub enum SqliteEnvelopeStoreError { FileSizeReadFailed(sqlx::Error), } +/// Enum representing the order in which [`Envelope`]s are fetched or deleted from the +/// database. +#[derive(Debug, Copy, Clone)] +pub enum EnvelopesOrder { + MostRecent, + Oldest, +} + /// Struct that offers access to a SQLite-based store of [`Envelope`]s. /// /// The goal of this struct is to hide away all the complexity of dealing with the database for @@ -221,10 +229,17 @@ impl SqliteEnvelopeStore { own_key: ProjectKey, sampling_key: ProjectKey, limit: i64, + envelopes_order: EnvelopesOrder, ) -> Result>, SqliteEnvelopeStoreError> { - let envelopes = build_delete_and_fetch_many_envelopes(own_key, sampling_key, limit) - .fetch(&self.db) - .peekable(); + let query = match envelopes_order { + EnvelopesOrder::MostRecent => { + build_delete_and_fetch_many_recent_envelopes(own_key, sampling_key, limit) + } + EnvelopesOrder::Oldest => { + build_delete_and_fetch_many_old_envelopes(own_key, sampling_key, limit) + } + }; + let envelopes = query.fetch(&self.db).peekable(); let mut envelopes = pin!(envelopes); if envelopes.as_mut().peek().await.is_none() { @@ -370,8 +385,8 @@ fn build_insert_many_envelopes<'a>( builder } -/// Builds a query that deletes many [`Envelope`] from the database. -pub fn build_delete_and_fetch_many_envelopes<'a>( +/// Builds a query that deletes many new [`Envelope`]s from the database. +pub fn build_delete_and_fetch_many_recent_envelopes<'a>( own_key: ProjectKey, project_key: ProjectKey, batch_size: i64, @@ -389,6 +404,25 @@ pub fn build_delete_and_fetch_many_envelopes<'a>( .bind(batch_size) } +/// Builds a query that deletes many old [`Envelope`]s from the database. +pub fn build_delete_and_fetch_many_old_envelopes<'a>( + own_key: ProjectKey, + project_key: ProjectKey, + batch_size: i64, +) -> Query<'a, Sqlite, SqliteArguments<'a>> { + sqlx::query( + "DELETE FROM + envelopes + WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? + ORDER BY received_at ASC LIMIT ?) + RETURNING + received_at, own_key, sampling_key, envelope", + ) + .bind(own_key.to_string()) + .bind(project_key.to_string()) + .bind(batch_size) +} + /// Creates a query which fetches the number of used database pages multiplied by the page size. /// /// This info used to estimate the current allocated database size. @@ -480,7 +514,7 @@ mod tests { // We check that if we load more than the limit, we still get back at most 10. let extracted_envelopes = envelope_store - .delete_many(own_key, sampling_key, 15) + .delete_many(own_key, sampling_key, 15, EnvelopesOrder::MostRecent) .await .unwrap(); assert_eq!(envelopes.len(), 10); diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 9716585606..6f8c4d4b5d 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -11,6 +11,7 @@ pub struct SqliteStackProvider { envelope_store: SqliteEnvelopeStore, disk_batch_size: usize, max_batches: usize, + max_evictable_envelopes: usize, } #[warn(dead_code)] @@ -22,6 +23,7 @@ impl SqliteStackProvider { envelope_store, disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), max_batches: config.spool_envelopes_stack_max_batches(), + max_evictable_envelopes: config.spool_envelopes_stack_max_evictable_envelopes(), }) } } @@ -37,6 +39,7 @@ impl StackProvider for SqliteStackProvider { self.envelope_store.clone(), self.disk_batch_size, self.max_batches, + self.max_evictable_envelopes, own_key, sampling_key, ) diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 1113d528fc..88e7cc52b0 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -514,6 +514,10 @@ pub enum RelayTimers { /// - `message`: The type of message that was processed. #[cfg(feature = "processing")] StoreServiceDuration, + /// Timing in milliseconds for constructing the LRU list of stacks. + BufferEvictLRUConstruction, + /// Timing in milliseconds to evict all the stacks determined by the LRU algorithm. + BufferEvictStacksEviction, } impl TimerMetric for RelayTimers { @@ -555,6 +559,8 @@ impl TimerMetric for RelayTimers { RelayTimers::MetricRouterServiceDuration => "metrics.router.message.duration", #[cfg(feature = "processing")] RelayTimers::StoreServiceDuration => "store.message.duration", + RelayTimers::BufferEvictLRUConstruction => "buffer.evict.lru_construction", + RelayTimers::BufferEvictStacksEviction => "buffer.evict.stacks_eviction", } } }