From 4bd1766b8c8f063e3d3fd5bc2c0d8e258e05f5f2 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Sat, 7 Sep 2024 11:36:40 +0200 Subject: [PATCH 1/6] feat(spooler): Implement eviction mechanism when buffer is full --- relay-config/src/config.rs | 29 +++ .../services/buffer/envelope_buffer/mod.rs | 190 ++++++++++++++++-- .../services/buffer/envelope_stack/memory.rs | 10 +- .../services/buffer/envelope_stack/sqlite.rs | 73 ++++++- .../services/buffer/envelope_store/sqlite.rs | 46 ++++- .../src/services/buffer/stack_provider/mod.rs | 8 +- .../services/buffer/stack_provider/sqlite.rs | 3 + relay-server/src/services/processor/event.rs | 3 +- relay-server/src/statsd.rs | 6 + 9 files changed, 332 insertions(+), 36 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 73b486199d..c21435873e 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -903,6 +903,7 @@ fn spool_envelopes_stack_max_batches() -> usize { 2 } +/// Default maximum time between receiving the envelope and processing it. fn spool_envelopes_max_envelope_delay_secs() -> u64 { 24 * 60 * 60 } @@ -912,6 +913,16 @@ fn spool_disk_usage_refresh_frequency_ms() -> u64 { 100 } +/// Default percentage of envelope stacks that can be evicted in the buffer. +fn spool_envelopes_evictable_stacks_percentage() -> f32 { + 0.1 +} + +/// Default maximum number of envelopes that can be evicted for each stack. +fn spool_envelopes_max_evictable_envelopes() -> usize { + 100 +} + /// Persistent buffering configuration for incoming envelopes. #[derive(Debug, Serialize, Deserialize)] pub struct EnvelopeSpool { @@ -955,6 +966,12 @@ pub struct EnvelopeSpool { /// internal page stats. #[serde(default = "spool_disk_usage_refresh_frequency_ms")] disk_usage_refresh_frequency_ms: u64, + /// Percentage of envelope stacks that can be evicted in the buffer. + #[serde(default = "spool_envelopes_evictable_stacks_percentage")] + evictable_stacks_percentage: f32, + /// Maximum number of envelopes that can be evicted for each stack. + #[serde(default = "spool_envelopes_max_evictable_envelopes")] + max_evictable_envelopes: usize, /// Version of the spooler. #[serde(default)] version: EnvelopeSpoolVersion, @@ -991,6 +1008,8 @@ impl Default for EnvelopeSpool { max_batches: spool_envelopes_stack_max_batches(), max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(), disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(), + evictable_stacks_percentage: spool_envelopes_evictable_stacks_percentage(), + max_evictable_envelopes: spool_envelopes_max_evictable_envelopes(), version: EnvelopeSpoolVersion::default(), } } @@ -2194,6 +2213,16 @@ impl Config { self.values.spool.envelopes.max_batches } + /// Maximum number of stacks that can be evicted. + pub fn spool_envelopes_evictable_stacks_percentage(&self) -> f32 { + self.values.spool.envelopes.evictable_stacks_percentage + } + + /// Maximum number of envelopes that can be evicted per stack. + pub fn spool_envelopes_stack_max_evictable_envelopes(&self) -> usize { + self.values.spool.envelopes.max_evictable_envelopes + } + /// Returns `true` if version 2 of the spooling mechanism is used. pub fn spool_v2(&self) -> bool { matches!( diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 433e46bf72..d14284f828 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -1,5 +1,5 @@ use std::cmp::Ordering; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, BinaryHeap}; use std::convert::Infallible; use std::error::Error; use std::sync::atomic::AtomicI64; @@ -19,7 +19,7 @@ use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; use crate::services::buffer::stack_provider::memory::MemoryStackProvider; use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; -use crate::services::buffer::stack_provider::{StackCreationType, StackProvider}; +use crate::services::buffer::stack_provider::{Evictable, StackCreationType, StackProvider}; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::MemoryChecker; @@ -51,7 +51,7 @@ impl PolymorphicEnvelopeBuffer { let buffer = EnvelopeBuffer::::new(config).await?; Self::Sqlite(buffer) } else { - let buffer = EnvelopeBuffer::::new(memory_checker); + let buffer = EnvelopeBuffer::::new(config, memory_checker); Self::InMemory(buffer) }; @@ -169,17 +169,20 @@ struct EnvelopeBuffer { /// This boolean is just used for tagging the metric that tracks the total count of envelopes /// in the buffer. total_count_initialized: bool, + /// The % of envelope stacks that can be evicted. + evictable_stacks_percentage: f32, } impl EnvelopeBuffer { /// Creates an empty memory-based buffer. - pub fn new(memory_checker: MemoryChecker) -> Self { + pub fn new(config: &Config, memory_checker: MemoryChecker) -> Self { Self { stacks_by_project: Default::default(), priority_queue: Default::default(), stack_provider: MemoryStackProvider::new(memory_checker), total_count: Arc::new(AtomicI64::new(0)), total_count_initialized: false, + evictable_stacks_percentage: config.spool_envelopes_evictable_stacks_percentage(), } } } @@ -194,6 +197,7 @@ impl EnvelopeBuffer { stack_provider: SqliteStackProvider::new(config).await?, total_count: Arc::new(AtomicI64::new(0)), total_count_initialized: false, + evictable_stacks_percentage: config.spool_envelopes_evictable_stacks_percentage(), }) } } @@ -224,10 +228,12 @@ where QueueItem { key: _, value: stack, + last_update, }, _, )) = self.priority_queue.get_mut(&project_key_pair) { + *last_update = Instant::now(); stack.push(envelope).await?; } else { // Since we have initialization code that creates all the necessary stacks, we assume @@ -256,6 +262,7 @@ where QueueItem { key: stack_key, value: stack, + last_update: _, }, Priority { readiness, .. }, )) = self.priority_queue.peek_mut() @@ -277,7 +284,15 @@ where /// The priority of the envelope's stack is updated with the next envelope's received_at /// time. If the stack is empty after popping, it is removed from the priority queue. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { - let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else { + let Some(( + QueueItem { + key, + value: stack, + last_update: _, + }, + _, + )) = self.priority_queue.peek_mut() + else { return Ok(None); }; let project_key_pair = *key; @@ -317,6 +332,17 @@ where let mut changed = false; if let Some(project_key_pair) = self.stacks_by_project.get(project) { for project_key_pair in project_key_pair { + if let Some(( + QueueItem { + key: _, + value: _, + last_update, + }, + _, + )) = self.priority_queue.get_mut(project_key_pair) + { + *last_update = Instant::now(); + }; self.priority_queue .change_priority_by(project_key_pair, |stack| { let mut found = false; @@ -358,6 +384,76 @@ where }); } + /// Returns `true` if the underlying storage has the capacity to store more envelopes. + pub fn has_capacity(&self) -> bool { + self.stack_provider.has_store_capacity() + } + + /// Evicts the least recently used stacks. + pub async fn evict(&mut self) { + #[derive(Debug, Copy, Clone)] + struct LRUItem(ProjectKeyPair, Readiness, Instant); + + impl PartialEq for LRUItem { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } + } + + impl PartialOrd for LRUItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Eq for LRUItem {} + + impl Ord for LRUItem { + fn cmp(&self, other: &Self) -> Ordering { + match (self.1.ready(), other.1.ready()) { + (true, false) => Ordering::Greater, + (false, true) => Ordering::Less, + _ => self.2.cmp(&other.2), + } + } + } + + // We calculate how many envelope stacks we want to keep track. + let max_lru_length = + ((self.priority_queue.len() as f32) * self.evictable_stacks_percentage) as usize; + let mut lru: BinaryHeap = BinaryHeap::new(); + relay_statsd::metric!(timer(RelayTimers::BufferEvictLRUConstruction), { + for (queue_item, priority) in self.priority_queue.iter() { + let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update); + + // If we exceed the size, we want to pop the greatest element only if we have a smaller + // element, so that we end up with the smallest elements which are the ones with the + // lowest priority. + if lru.len() >= max_lru_length { + let Some(top_lru_item) = lru.peek() else { + continue; + }; + + if lru_item < *top_lru_item { + lru.pop(); + } + } + + lru.push(lru_item); + } + }); + + // We go over each element and remove it from the stack. After removal, we will evict + // elements from each popped stack. + relay_statsd::metric!(timer(RelayTimers::BufferEvictStacksEviction), { + for lru_item in lru { + if let Some(mut stack) = self.pop_stack(lru_item.0) { + stack.evict().await; + } + } + }); + } + /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted. async fn push_stack( &mut self, @@ -380,6 +476,7 @@ where QueueItem { key: project_key_pair, value: stack, + last_update: Instant::now(), }, Priority::new(received_at), ); @@ -397,24 +494,30 @@ where Ok(()) } - /// Returns `true` if the underlying storage has the capacity to store more envelopes. - pub fn has_capacity(&self) -> bool { - self.stack_provider.has_store_capacity() - } - - /// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`]. - fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) { + /// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`] and returns the popped + /// [`EnvelopeStack`]. + fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) -> Option { for project_key in project_key_pair.iter() { - self.stacks_by_project + let stack_keys = self + .stacks_by_project .get_mut(&project_key) - .expect("project_key is missing from lookup") - .remove(&project_key_pair); + .expect("project_key is missing from lookup"); + + // If there is only one stack key, we can directly remove the entry to save some memory. + if stack_keys.len() == 1 { + self.stacks_by_project.remove(&project_key); + } else { + stack_keys.remove(&project_key_pair); + }; } - self.priority_queue.remove(&project_key_pair); + + let stack = self.priority_queue.remove(&project_key_pair); relay_statsd::metric!( gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 ); + + stack.map(|(q, _)| q.value) } /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`]. @@ -479,6 +582,7 @@ pub enum Peek<'a> { struct QueueItem { key: K, value: V, + last_update: Instant, } impl std::borrow::Borrow for QueueItem { @@ -555,7 +659,7 @@ impl Ord for Priority { } } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] struct Readiness { own_project_ready: bool, sampling_project_ready: bool, @@ -653,7 +757,8 @@ mod tests { #[tokio::test] async fn test_insert_pop() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let config = mock_config("none"); + let mut buffer = EnvelopeBuffer::::new(&config, mock_memory_checker()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); @@ -786,7 +891,8 @@ mod tests { #[tokio::test] async fn test_project_internal_order() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let config = mock_config("none"); + let mut buffer = EnvelopeBuffer::::new(&config, mock_memory_checker()); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -813,7 +919,8 @@ mod tests { #[tokio::test] async fn test_sampling_projects() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let config = mock_config("none"); + let mut buffer = EnvelopeBuffer::::new(&config, mock_memory_checker()); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); @@ -926,7 +1033,8 @@ mod tests { assert_ne!(project_key_pair1, project_key_pair2); - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let config = mock_config("none"); + let mut buffer = EnvelopeBuffer::::new(&config, mock_memory_checker()); buffer .push(new_envelope(project_key1, Some(project_key2), None)) .await @@ -940,7 +1048,8 @@ mod tests { #[tokio::test] async fn test_last_peek_internal_order() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + let config = mock_config("none"); + let mut buffer = EnvelopeBuffer::::new(&config, mock_memory_checker()); let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let event_id_1 = EventId::new(); @@ -1021,4 +1130,41 @@ mod tests { // should be 2. assert_eq!(buffer.stacks_by_project.len(), 2); } + + #[tokio::test] + async fn test_eviction() { + let config = mock_config("none"); + let mut buffer = EnvelopeBuffer::::new(&config, mock_memory_checker()); + + let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let project_key_3 = ProjectKey::parse("e23ae32be2584e0bbd7a4cbb95971fed").unwrap(); + + let envelopes = [ + new_envelope(project_key_1, Some(project_key_2), Some(EventId::new())), + new_envelope(project_key_2, Some(project_key_1), Some(EventId::new())), + new_envelope(project_key_1, Some(project_key_3), Some(EventId::new())), + new_envelope(project_key_3, Some(project_key_1), Some(EventId::new())), + ]; + + for envelope in envelopes.clone() { + buffer.push(envelope).await.unwrap(); + // We sleep to make sure that the `last_update` of `QueueItem` is different. + tokio::time::sleep(Duration::from_millis(1)).await; + } + + buffer.mark_ready(&project_key_1, true); + buffer.mark_ready(&project_key_2, true); + + buffer.evict().await; + + assert_eq!(buffer.priority_queue.len(), 1); + let peek = buffer.peek().await.unwrap(); + assert!(matches!(peek, Peek::Ready(_))); + if let Peek::Ready(envelope) = peek { + // We expect that only the 2nd envelope is kept, since the last 2 have non-ready projects + // and the first one is the oldest of the ones that are ready. + assert_eq!(envelope.event_id(), envelopes[1].event_id()); + }; + } } diff --git a/relay-server/src/services/buffer/envelope_stack/memory.rs b/relay-server/src/services/buffer/envelope_stack/memory.rs index d9723e601a..110da94ab1 100644 --- a/relay-server/src/services/buffer/envelope_stack/memory.rs +++ b/relay-server/src/services/buffer/envelope_stack/memory.rs @@ -1,8 +1,8 @@ use std::convert::Infallible; -use crate::Envelope; - use super::EnvelopeStack; +use crate::services::buffer::stack_provider::Evictable; +use crate::Envelope; #[derive(Debug)] pub struct MemoryEnvelopeStack(#[allow(clippy::vec_box)] Vec>); @@ -29,3 +29,9 @@ impl EnvelopeStack for MemoryEnvelopeStack { Ok(self.0.pop()) } } + +impl Evictable for MemoryEnvelopeStack { + async fn evict(&mut self) { + self.0.clear() + } +} diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 13e1fb5a13..0fd08a8ef7 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -7,8 +7,9 @@ use relay_base_schema::project::ProjectKey; use crate::envelope::Envelope; use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::envelope_store::sqlite::{ - SqliteEnvelopeStore, SqliteEnvelopeStoreError, + EnvelopesOrder, SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; +use crate::services::buffer::stack_provider::Evictable; use crate::statsd::RelayTimers; /// An error returned when doing an operation on [`SqliteEnvelopeStack`]. @@ -31,6 +32,8 @@ pub struct SqliteEnvelopeStack { spool_threshold: NonZeroUsize, /// Size of a batch of envelopes that is written to disk. batch_size: NonZeroUsize, + /// Maximum number of envelopes that can be evicted. + max_evictable_envelopes: NonZeroUsize, /// The project key of the project to which all the envelopes belong. own_key: ProjectKey, /// The project key of the root project of the trace to which all the envelopes belong. @@ -51,6 +54,7 @@ impl SqliteEnvelopeStack { envelope_store: SqliteEnvelopeStore, disk_batch_size: usize, max_batches: usize, + max_evictable_envelopes: usize, own_key: ProjectKey, sampling_key: ProjectKey, check_disk: bool, @@ -61,6 +65,8 @@ impl SqliteEnvelopeStack { .expect("the spool threshold must be > 0"), batch_size: NonZeroUsize::new(disk_batch_size) .expect("the disk batch size must be > 0"), + max_evictable_envelopes: NonZeroUsize::new(max_evictable_envelopes) + .expect("the max evictable envelopes must be > 0"), own_key, sampling_key, batches_buffer: VecDeque::with_capacity(max_batches), @@ -126,6 +132,7 @@ impl SqliteEnvelopeStack { self.own_key, self.sampling_key, self.batch_size.get() as i64, + EnvelopesOrder::MostRecent, ) .await .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)? @@ -228,14 +235,37 @@ impl EnvelopeStack for SqliteEnvelopeStack { } } +impl Evictable for SqliteEnvelopeStack { + async fn evict(&mut self) { + // We want to evict all elements in memory. + self.batches_buffer.clear(); + self.batches_buffer_size = 0; + + if self + .envelope_store + .delete_many( + self.own_key, + self.sampling_key, + self.max_evictable_envelopes.get() as i64, + EnvelopesOrder::Oldest, + ) + .await + .is_err() + { + relay_log::error!("failed to evict envelopes from disk"); + }; + } +} + #[cfg(test)] mod tests { use std::time::{Duration, Instant}; - use relay_base_schema::project::ProjectKey; - use super::*; + use crate::services::buffer::envelope_store::sqlite::EnvelopesOrder; + use crate::services::buffer::stack_provider::Evictable; use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db}; + use relay_base_schema::project::ProjectKey; #[tokio::test] #[should_panic] @@ -246,6 +276,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(), true, @@ -263,6 +294,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), true, @@ -316,6 +348,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), true, @@ -336,6 +369,7 @@ mod tests { envelope_store, 2, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), true, @@ -354,6 +388,7 @@ mod tests { envelope_store, 5, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), true, @@ -392,6 +427,7 @@ mod tests { envelope_store, 5, 2, + 10, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), true, @@ -453,4 +489,35 @@ mod tests { } assert_eq!(stack.batches_buffer_size, 0); } + + #[tokio::test] + async fn test_evict() { + let db = setup_db(true).await; + let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); + let mut stack = + SqliteEnvelopeStack::new(envelope_store.clone(), 5, 1, 2, own_key, sampling_key, true); + + let envelopes = mock_envelopes(15); + + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + + stack.evict().await; + // We expect 0 in-memory data since we flushed the 5 in-memory envelopes. + assert!(stack.batches_buffer.is_empty()); + assert_eq!(stack.batches_buffer_size, 0); + // We expect 2 out of the 10 envelopes on disk to have been flushed, so if we load 15, we + // should get 8 back. + assert_eq!( + envelope_store + .delete_many(own_key, sampling_key, 15, EnvelopesOrder::MostRecent) + .await + .unwrap() + .len(), + 8 + ); + } } diff --git a/relay-server/src/services/buffer/envelope_store/sqlite.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs index e2dafa6685..34ea524106 100644 --- a/relay-server/src/services/buffer/envelope_store/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -180,6 +180,14 @@ impl DiskUsage { } } +/// Enum representing the order in which [`Envelope`]s are fetched or deleted from the +/// database. +#[derive(Debug, Copy, Clone)] +pub enum EnvelopesOrder { + MostRecent, + Oldest, +} + /// Struct that offers access to a SQLite-based store of [`Envelope`]s. /// /// The goal of this struct is to hide away all the complexity of dealing with the database for @@ -321,10 +329,17 @@ impl SqliteEnvelopeStore { own_key: ProjectKey, sampling_key: ProjectKey, limit: i64, + envelopes_order: EnvelopesOrder, ) -> Result>, SqliteEnvelopeStoreError> { - let envelopes = build_delete_and_fetch_many_envelopes(own_key, sampling_key, limit) - .fetch(&self.db) - .peekable(); + let query = match envelopes_order { + EnvelopesOrder::MostRecent => { + build_delete_and_fetch_many_recent_envelopes(own_key, sampling_key, limit) + } + EnvelopesOrder::Oldest => { + build_delete_and_fetch_many_old_envelopes(own_key, sampling_key, limit) + } + }; + let envelopes = query.fetch(&self.db).peekable(); let mut envelopes = pin!(envelopes); if envelopes.as_mut().peek().await.is_none() { @@ -475,8 +490,8 @@ fn build_insert_many_envelopes<'a>( builder } -/// Builds a query that deletes many [`Envelope`] from the database. -pub fn build_delete_and_fetch_many_envelopes<'a>( +/// Builds a query that deletes many new [`Envelope`] from the database. +pub fn build_delete_and_fetch_many_recent_envelopes<'a>( own_key: ProjectKey, project_key: ProjectKey, batch_size: i64, @@ -494,6 +509,25 @@ pub fn build_delete_and_fetch_many_envelopes<'a>( .bind(batch_size) } +/// Builds a query that deletes many old [`Envelope`]s from the database. +pub fn build_delete_and_fetch_many_old_envelopes<'a>( + own_key: ProjectKey, + project_key: ProjectKey, + batch_size: i64, +) -> Query<'a, Sqlite, SqliteArguments<'a>> { + sqlx::query( + "DELETE FROM + envelopes + WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? + ORDER BY received_at ASC LIMIT ?) + RETURNING + received_at, own_key, sampling_key, envelope", + ) + .bind(own_key.to_string()) + .bind(project_key.to_string()) + .bind(batch_size) +} + /// Creates a query which fetches the number of used database pages multiplied by the page size. /// /// This info used to estimate the current allocated database size. @@ -547,7 +581,7 @@ mod tests { // We check that if we load more than the limit, we still get back at most 10. let extracted_envelopes = envelope_store - .delete_many(own_key, sampling_key, 15) + .delete_many(own_key, sampling_key, 15, EnvelopesOrder::MostRecent) .await .unwrap(); assert_eq!(envelopes.len(), 10); diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs index 1b1e319509..970a3b1da3 100644 --- a/relay-server/src/services/buffer/stack_provider/mod.rs +++ b/relay-server/src/services/buffer/stack_provider/mod.rs @@ -39,7 +39,7 @@ pub enum StackCreationType { /// A provider of [`EnvelopeStack`] instances that is responsible for creating them. pub trait StackProvider: std::fmt::Debug { /// The implementation of [`EnvelopeStack`] that this manager creates. - type Stack: EnvelopeStack; + type Stack: EnvelopeStack + Evictable; /// Initializes the [`StackProvider`]. fn initialize(&self) -> impl Future; @@ -61,3 +61,9 @@ pub trait StackProvider: std::fmt::Debug { /// Returns the string representation of the stack type offered by this [`StackProvider`]. fn stack_type<'a>(&self) -> &'a str; } + +/// An object that can support eviction. +pub trait Evictable: Send + std::fmt::Debug { + /// Evicts data from an [`Evictable`]. + fn evict(&mut self) -> impl Future; +} diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 5399e8e6db..2ebcff07d1 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -16,6 +16,7 @@ pub struct SqliteStackProvider { disk_batch_size: usize, max_batches: usize, max_disk_size: usize, + max_evictable_envelopes: usize, } #[warn(dead_code)] @@ -28,6 +29,7 @@ impl SqliteStackProvider { disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), max_batches: config.spool_envelopes_stack_max_batches(), max_disk_size: config.spool_envelopes_max_disk_size(), + max_evictable_envelopes: config.spool_envelopes_stack_max_evictable_envelopes(), }) } @@ -62,6 +64,7 @@ impl StackProvider for SqliteStackProvider { self.envelope_store.clone(), self.disk_batch_size, self.max_batches, + self.max_evictable_envelopes, project_key_pair.own_key, project_key_pair.sampling_key, // We want to check the disk by default if we are creating the stack for the first time, diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index eee0c3ff79..54fec8d519 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -872,8 +872,7 @@ mod tests { let event = Annotated::new(Event { release: Annotated::new( - String::from("���7��#1G����7��#1G����7��#1G����7��#1G����7��#") - .into(), + String::from("���7��#1G����7��#1G����7��#1G����7��#1G����7��#").into(), ), ..Default::default() }); diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 3788854b93..b3ed227446 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -524,6 +524,10 @@ pub enum RelayTimers { BufferSpool, /// Timing in milliseconds for the time it takes for the buffer to unspool data from disk. BufferUnspool, + /// Timing in milliseconds for constructing the LRU list of stacks. + BufferEvictLRUConstruction, + /// Timing in milliseconds to evict all the stacks determined by the LRU algorithm. + BufferEvictStacksEviction, } impl TimerMetric for RelayTimers { @@ -568,6 +572,8 @@ impl TimerMetric for RelayTimers { RelayTimers::BufferInitialization => "buffer.initialization.duration", RelayTimers::BufferSpool => "buffer.spool.duration", RelayTimers::BufferUnspool => "buffer.unspool.duration", + RelayTimers::BufferEvictLRUConstruction => "buffer.evict.lru_construction.duration", + RelayTimers::BufferEvictStacksEviction => "buffer.evict.stacks_eviction.duration", } } } From 88a9fb56bb50197083a8cbb22a6c7388d6a9fff2 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Sat, 7 Sep 2024 14:35:53 +0200 Subject: [PATCH 2/6] Fix --- relay-server/benches/benches.rs | 3 +++ .../services/buffer/envelope_buffer/mod.rs | 26 ++++++++++++++----- .../services/buffer/envelope_stack/sqlite.rs | 20 +++++--------- relay-server/src/services/buffer/mod.rs | 14 +++++++++- 4 files changed, 43 insertions(+), 20 deletions(-) diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index 045ad1bd93..00f747671c 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -101,6 +101,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), true, @@ -138,6 +139,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), true, @@ -179,6 +181,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { envelope_store.clone(), disk_batch_size, 2, + 10, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), true, diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index d14284f828..afb4129ef1 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -124,6 +124,14 @@ impl PolymorphicEnvelopeBuffer { Self::InMemory(buffer) => buffer.has_capacity(), } } + + /// Evicts the least recently used envelope stacks from the buffer. + pub async fn evict(&mut self) { + match self { + Self::Sqlite(buffer) => buffer.evict().await, + Self::InMemory(buffer) => buffer.evict().await, + } + } } /// Error that occurs while interacting with the envelope buffer. @@ -330,8 +338,8 @@ where /// Returns `true` if at least one priority was changed. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; - if let Some(project_key_pair) = self.stacks_by_project.get(project) { - for project_key_pair in project_key_pair { + if let Some(project_key_pairs) = self.stacks_by_project.get(project) { + for project_key_pair in project_key_pairs { if let Some(( QueueItem { key: _, @@ -418,10 +426,15 @@ where } } - // We calculate how many envelope stacks we want to keep track. + // We calculate how many envelope stacks we want to evict. let max_lru_length = - ((self.priority_queue.len() as f32) * self.evictable_stacks_percentage) as usize; - let mut lru: BinaryHeap = BinaryHeap::new(); + ((self.priority_queue.len() as f32) * self.evictable_stacks_percentage).ceil() as usize; + relay_log::trace!( + "Evicting {} elements from the envelope buffer", + max_lru_length + ); + + let mut lru: BinaryHeap = BinaryHeap::with_capacity(max_lru_length); relay_statsd::metric!(timer(RelayTimers::BufferEvictLRUConstruction), { for (queue_item, priority) in self.priority_queue.iter() { let lru_item = LRUItem(queue_item.key, priority.readiness, queue_item.last_update); @@ -743,7 +756,8 @@ mod tests { Config::from_json_value(serde_json::json!({ "spool": { "envelopes": { - "path": path + "path": path, + "evictable_stacks_percentage": 0.7 } } })) diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 0fd08a8ef7..79880255ea 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -237,10 +237,12 @@ impl EnvelopeStack for SqliteEnvelopeStack { impl Evictable for SqliteEnvelopeStack { async fn evict(&mut self) { - // We want to evict all elements in memory. + // We want to evict all elements in memory, even though the capacity check for the sqlite + // implementation is triggered only based on disk usage. self.batches_buffer.clear(); self.batches_buffer_size = 0; + // We remove a fixed number of envelopes from disk to free up some space. if self .envelope_store .delete_many( @@ -262,7 +264,6 @@ mod tests { use std::time::{Duration, Instant}; use super::*; - use crate::services::buffer::envelope_store::sqlite::EnvelopesOrder; use crate::services::buffer::stack_provider::Evictable; use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db}; use relay_base_schema::project::ProjectKey; @@ -493,7 +494,7 @@ mod tests { #[tokio::test] async fn test_evict() { let db = setup_db(true).await; - let mut envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); + let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100)); let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); let mut stack = @@ -509,15 +510,8 @@ mod tests { // We expect 0 in-memory data since we flushed the 5 in-memory envelopes. assert!(stack.batches_buffer.is_empty()); assert_eq!(stack.batches_buffer_size, 0); - // We expect 2 out of the 10 envelopes on disk to have been flushed, so if we load 15, we - // should get 8 back. - assert_eq!( - envelope_store - .delete_many(own_key, sampling_key, 15, EnvelopesOrder::MostRecent) - .await - .unwrap() - .len(), - 8 - ); + // We expect 2 out of the 10 envelopes on disk to have been flushed, so we should have 8 + // on disk. + assert_eq!(envelope_store.total_count().await.unwrap(), 8); } } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index cbe33a87f0..ff60db921d 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -205,6 +205,7 @@ impl EnvelopeBufferService { self.sleep = Duration::ZERO; } + /// Pushes an [`Envelope`] to the [`PolymorphicEnvelopeBuffer`]. async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { if let Err(e) = buffer.push(envelope).await { relay_log::error!( @@ -214,6 +215,13 @@ impl EnvelopeBufferService { } } + /// Tries to evict elements from the [`PolymorphicEnvelopeBuffer`] if no capacity + async fn try_evict(&self, buffer: &mut PolymorphicEnvelopeBuffer) { + if !buffer.has_capacity() { + buffer.evict().await; + } + } + fn update_observable_state(&self, buffer: &mut PolymorphicEnvelopeBuffer) { self.has_capacity .store(buffer.has_capacity(), Ordering::Relaxed); @@ -241,6 +249,8 @@ impl Service for EnvelopeBufferService { }; buffer.initialize().await; + let mut ticker = tokio::time::interval(Duration::from_millis(100)); + relay_log::info!("EnvelopeBufferService start"); loop { relay_log::trace!("EnvelopeBufferService loop"); @@ -261,7 +271,9 @@ impl Service for EnvelopeBufferService { Some(message) = rx.recv() => { self.handle_message(&mut buffer, message).await; } - + _ = ticker.tick() => { + self.try_evict(&mut buffer).await; + } else => break, } From d7af6cbf7c469206fb8bf1227aa9182b92f3f01e Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 9 Sep 2024 10:53:23 +0200 Subject: [PATCH 3/6] Fix --- relay-server/src/services/processor/event.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 54fec8d519..eee0c3ff79 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -872,7 +872,8 @@ mod tests { let event = Annotated::new(Event { release: Annotated::new( - String::from("���7��#1G����7��#1G����7��#1G����7��#1G����7��#").into(), + String::from("���7��#1G����7��#1G����7��#1G����7��#1G����7��#") + .into(), ), ..Default::default() }); From 30a47ef9f2c941d6f181bee748ba2aaf9c86890c Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 9 Sep 2024 11:25:16 +0200 Subject: [PATCH 4/6] Add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd35f79d48..0c542d03ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Remove the OTEL spans endpoint in favor of Envelopes. ([#3973](https://github.com/getsentry/relay/pull/3973)) - Remove the `generate-schema` tool. Relay no longer exposes JSON schema for the event protocol. Consult the Rust type documentation of the `relay-event-schema` crate instead. ([#3974](https://github.com/getsentry/relay/pull/3974)) - Allow creation of `SqliteEnvelopeBuffer` from config, and load existing stacks from db on startup. ([#3967](https://github.com/getsentry/relay/pull/3967)) +- Implement eviction policy when the `EnvelopeBuffer` doesn't have enough capacity. ([#4014](https://github.com/getsentry/relay/pull/4014)) ## 24.8.0 From 22003d0ca5201086eca396db7dd723c65e3811d2 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 9 Sep 2024 12:51:00 +0200 Subject: [PATCH 5/6] Fix --- .../src/services/buffer/envelope_buffer/mod.rs | 18 +++++------------- .../services/buffer/envelope_stack/sqlite.rs | 4 ++-- .../services/buffer/envelope_store/sqlite.rs | 10 +++++----- relay-server/src/services/buffer/mod.rs | 3 ++- 4 files changed, 14 insertions(+), 21 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index afb4129ef1..b4dd8bb5db 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -234,9 +234,9 @@ where let project_key_pair = ProjectKeyPair::from_envelope(&envelope); if let Some(( QueueItem { - key: _, value: stack, last_update, + .. }, _, )) = self.priority_queue.get_mut(&project_key_pair) @@ -270,7 +270,7 @@ where QueueItem { key: stack_key, value: stack, - last_update: _, + .. }, Priority { readiness, .. }, )) = self.priority_queue.peek_mut() @@ -294,9 +294,7 @@ where pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { let Some(( QueueItem { - key, - value: stack, - last_update: _, + key, value: stack, .. }, _, )) = self.priority_queue.peek_mut() @@ -340,14 +338,8 @@ where let mut changed = false; if let Some(project_key_pairs) = self.stacks_by_project.get(project) { for project_key_pair in project_key_pairs { - if let Some(( - QueueItem { - key: _, - value: _, - last_update, - }, - _, - )) = self.priority_queue.get_mut(project_key_pair) + if let Some((QueueItem { last_update, .. }, _)) = + self.priority_queue.get_mut(project_key_pair) { *last_update = Instant::now(); }; diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 79880255ea..c9e02c1ecd 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -132,7 +132,7 @@ impl SqliteEnvelopeStack { self.own_key, self.sampling_key, self.batch_size.get() as i64, - EnvelopesOrder::MostRecent, + EnvelopesOrder::NewestFirst, ) .await .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)? @@ -249,7 +249,7 @@ impl Evictable for SqliteEnvelopeStack { self.own_key, self.sampling_key, self.max_evictable_envelopes.get() as i64, - EnvelopesOrder::Oldest, + EnvelopesOrder::OldestFirst, ) .await .is_err() diff --git a/relay-server/src/services/buffer/envelope_store/sqlite.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs index 34ea524106..f03fe29465 100644 --- a/relay-server/src/services/buffer/envelope_store/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -184,8 +184,8 @@ impl DiskUsage { /// database. #[derive(Debug, Copy, Clone)] pub enum EnvelopesOrder { - MostRecent, - Oldest, + NewestFirst, + OldestFirst, } /// Struct that offers access to a SQLite-based store of [`Envelope`]s. @@ -332,10 +332,10 @@ impl SqliteEnvelopeStore { envelopes_order: EnvelopesOrder, ) -> Result>, SqliteEnvelopeStoreError> { let query = match envelopes_order { - EnvelopesOrder::MostRecent => { + EnvelopesOrder::NewestFirst => { build_delete_and_fetch_many_recent_envelopes(own_key, sampling_key, limit) } - EnvelopesOrder::Oldest => { + EnvelopesOrder::OldestFirst => { build_delete_and_fetch_many_old_envelopes(own_key, sampling_key, limit) } }; @@ -581,7 +581,7 @@ mod tests { // We check that if we load more than the limit, we still get back at most 10. let extracted_envelopes = envelope_store - .delete_many(own_key, sampling_key, 15, EnvelopesOrder::MostRecent) + .delete_many(own_key, sampling_key, 15, EnvelopesOrder::NewestFirst) .await .unwrap(); assert_eq!(envelopes.len(), 10); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index cba3c206f1..21fa7e665b 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -204,11 +204,12 @@ impl EnvelopeBufferService { buffer.mark_ready(&project_key, true); } }; + self.sleep = Duration::ZERO; } /// Pushes an [`Envelope`] to the [`PolymorphicEnvelopeBuffer`]. - async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { + async fn push(&self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { if let Err(e) = buffer.push(envelope).await { relay_log::error!( error = &e as &dyn std::error::Error, From 457c14c5582f6b8055203b180cb2250f6dc2e655 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 9 Sep 2024 13:01:15 +0200 Subject: [PATCH 6/6] Fix --- relay-server/src/services/buffer/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 21fa7e665b..396b8a2221 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -191,13 +191,13 @@ impl EnvelopeBufferService { // For better separation of concerns, this prefetch should be triggered from here // once buffer V1 has been removed. relay_log::trace!("EnvelopeBufferService push"); - self.push(buffer, envelope).await; + Self::push(buffer, envelope).await; } EnvelopeBuffer::NotReady(project_key, envelope) => { relay_log::trace!("EnvelopeBufferService project not ready"); buffer.mark_ready(&project_key, false); relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1); - self.push(buffer, envelope).await; + Self::push(buffer, envelope).await; } EnvelopeBuffer::Ready(project_key) => { relay_log::trace!("EnvelopeBufferService project ready {}", &project_key); @@ -209,7 +209,7 @@ impl EnvelopeBufferService { } /// Pushes an [`Envelope`] to the [`PolymorphicEnvelopeBuffer`]. - async fn push(&self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { + async fn push(buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { if let Err(e) = buffer.push(envelope).await { relay_log::error!( error = &e as &dyn std::error::Error, @@ -219,7 +219,7 @@ impl EnvelopeBufferService { } /// Tries to evict elements from the [`PolymorphicEnvelopeBuffer`] if no capacity - async fn try_evict(&self, buffer: &mut PolymorphicEnvelopeBuffer) { + async fn try_evict(buffer: &mut PolymorphicEnvelopeBuffer) { if !buffer.has_capacity() { buffer.evict().await; } @@ -275,7 +275,7 @@ impl Service for EnvelopeBufferService { self.handle_message(&mut buffer, message).await; } _ = ticker.tick() => { - self.try_evict(&mut buffer).await; + Self::try_evict(&mut buffer).await; } else => break, }