diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index 045ad1bd93..00f747671c 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -101,6 +101,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), true, @@ -138,6 +139,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), true, @@ -179,6 +181,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), true, diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index d14284f828..afb4129ef1 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -124,6 +124,14 @@ impl PolymorphicEnvelopeBuffer { Self::InMemory(buffer) => buffer.has_capacity(), } } + + /// Evicts the least recently used envelope stacks from the buffer. + pub async fn evict(&mut self) { + match self { + Self::Sqlite(buffer) => buffer.evict().await, + Self::InMemory(buffer) => buffer.evict().await, + } + } } /// Error that occurs while interacting with the envelope buffer. @@ -330,8 +338,8 @@ where /// Returns `true` if at least one priority was changed. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; - if let Some(project_key_pair) = self.stacks_by_project.get(project) { - for project_key_pair in project_key_pair { + if let Some(project_key_pairs) = self.stacks_by_project.get(project) { + for project_key_pair in project_key_pairs { if let Some(( QueueItem { key: _, @@ -418,10 +426,15 @@ where } } - // We calculate how many envelope stacks we want to keep track. + // We calculate how many envelope stacks we want to evict. let max_lru_length = - ((self.priority_queue.len() as f32) * self.evictable_stacks_percentage) as usize; - let mut lru: BinaryHeap = BinaryHeap::new(); + ((self.priority_queue.len() as f32) * self.evictable_stacks_percentage).ceil() as usize; + relay_log::trace!( + "Evicting {} elements from the envelope buffer", + max_lru_length + ); + + let mut lru: BinaryHeap = BinaryHeap::with_capacity(max_lru_length); relay_statsd::metric!(timer(RelayTimers::BufferEvictLRUConstruction), { for (queue_item, priority) in self.priority_queue.iter() { let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update); @@ -743,7 +756,8 @@ mod tests { Config::from_json_value(serde_json::json!({ "spool": { "envelopes": { - "path": path + "path": path, + "evictable_stacks_percentage": 0.7 } } })) diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 0fd08a8ef7..79880255ea 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -237,10 +237,12 @@ impl EnvelopeStack for SqliteEnvelopeStack { impl Evictable for SqliteEnvelopeStack { async fn evict(&mut self) { - // We want to evict all elements in memory. + // We want to evict all elements in memory, even though the capacity check for the sqlite + // implementation is triggered only based on disk usage. self.batches_buffer.clear(); self.batches_buffer_size = 0; + // We remove a fixed number of envelopes from disk to free up some space. if self .envelope_store .delete_many( @@ -262,7 +264,6 @@ mod tests { use std::time::{Duration, Instant}; use super::*; - use crate::services::buffer::envelope_store::sqlite::EnvelopesOrder; use crate::services::buffer::stack_provider::Evictable; use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db}; use relay_base_schema::project::ProjectKey; @@ -493,7 +494,7 @@ mod tests { #[tokio::test] async fn test_evict() { let db = setup_db(true).await; - let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); let mut stack = @@ -509,15 +510,8 @@ mod tests { // We expect 0 in-memory data since we flushed the 5 in-memory envelopes. assert!(stack.batches_buffer.is_empty()); assert_eq!(stack.batches_buffer_size, 0); - // We expect 2 out of the 10 envelopes on disk to have been flushed, so if we load 15, we - // should get 8 back. - assert_eq!( - envelope_store - .delete_many(own_key, sampling_key, 15, EnvelopesOrder::MostRecent) - .await - .unwrap() - .len(), - 8 - ); + // We expect 2 out of the 10 envelopes on disk to have been flushed, so we should have 8 + // on disk. + assert_eq!(envelope_store.total_count().await.unwrap(), 8); } } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index cbe33a87f0..ff60db921d 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -205,6 +205,7 @@ impl EnvelopeBufferService { self.sleep = Duration::ZERO; } + /// Pushes an [`Envelope`] to the [`PolymorphicEnvelopeBuffer`]. async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { if let Err(e) = buffer.push(envelope).await { relay_log::error!( @@ -214,6 +215,13 @@ impl EnvelopeBufferService { } } + /// Tries to evict elements from the [`PolymorphicEnvelopeBuffer`] if no capacity + async fn try_evict(&self, buffer: &mut PolymorphicEnvelopeBuffer) { + if !buffer.has_capacity() { + buffer.evict().await; + } + } + fn update_observable_state(&self, buffer: &mut PolymorphicEnvelopeBuffer) { self.has_capacity .store(buffer.has_capacity(), Ordering::Relaxed); @@ -241,6 +249,8 @@ impl Service for EnvelopeBufferService { }; buffer.initialize().await; + let mut ticker = tokio::time::interval(Duration::from_millis(100)); + relay_log::info!("EnvelopeBufferService start"); loop { relay_log::trace!("EnvelopeBufferService loop"); @@ -261,7 +271,9 @@ impl Service for EnvelopeBufferService { Some(message) = rx.recv() => { self.handle_message(&mut buffer, message).await; } - + _ = ticker.tick() => { + self.try_evict(&mut buffer).await; + } else => break, }