From 35f64330c9dfdc6dc6ff268805a570dad6c01be6 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 13 Aug 2024 14:45:45 +0200 Subject: [PATCH] feat(spooler): Add EnvelopeStore trait and capacity check --- relay-config/src/config.rs | 2 +- relay-server/src/endpoints/common.rs | 13 +-- relay-server/src/service.rs | 6 +- .../services/buffer/envelope_buffer/mod.rs | 62 +++++++----- .../src/services/buffer/envelope_stack/mod.rs | 6 -- .../services/buffer/envelope_stack/sqlite.rs | 3 +- .../src/services/buffer/envelope_store/mod.rs | 39 ++++++++ .../sqlite.rs} | 97 ++++++++++++++----- relay-server/src/services/buffer/mod.rs | 18 ++-- .../services/buffer/stack_provider/memory.rs | 14 --- .../src/services/buffer/stack_provider/mod.rs | 2 - .../services/buffer/stacks_manager/memory.rs | 31 ++++++ .../src/services/buffer/stacks_manager/mod.rs | 19 ++++ .../sqlite.rs | 20 ++-- 14 files changed, 238 insertions(+), 94 deletions(-) create mode 100644 relay-server/src/services/buffer/envelope_store/mod.rs rename relay-server/src/services/buffer/{sqlite_envelope_store.rs => envelope_store/sqlite.rs} (88%) delete mode 100644 relay-server/src/services/buffer/stack_provider/memory.rs delete mode 100644 relay-server/src/services/buffer/stack_provider/mod.rs create mode 100644 relay-server/src/services/buffer/stacks_manager/memory.rs create mode 100644 relay-server/src/services/buffer/stacks_manager/mod.rs rename relay-server/src/services/buffer/{stack_provider => stacks_manager}/sqlite.rs (65%) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 88d9269398..b4233f08ef 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -879,7 +879,7 @@ pub struct EnvelopeSpool { min_connections: u32, /// The maximum size of the buffer to keep, in bytes. /// - /// If not set the befault is 524288000 bytes (500MB). + /// If not set the default is 524288000 bytes (500MB). #[serde(default = "spool_envelopes_max_disk_size")] max_disk_size: ByteSize, /// The maximum bytes to keep in the memory buffer before spooling envelopes to disk, in bytes. diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 20c8ccf211..16fdc5d416 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -307,12 +307,14 @@ fn queue_envelope( match state.envelope_buffer() { Some(buffer) => { + if !buffer.has_capacity() { + return Err(BadStoreRequest::QueueFailed); + } + // NOTE: This assumes that a `prefetch` has already been scheduled for both the // envelope's projects. See `handle_check_envelope`. relay_log::trace!("Pushing envelope to V2 buffer"); - // TODO: Sync-check whether the buffer has capacity. - // Otherwise return `QueueFailed`. buffer.defer_push(envelope); } None => { @@ -347,13 +349,6 @@ pub async fn handle_envelope( ) } - // TODO(jjbayer): Move this check to spool impl - if state.memory_checker().check_memory().is_exceeded() { - // NOTE: Long-term, we should not reject the envelope here, but spool it to disk instead. - // This will be fixed with the new spool implementation. - return Err(BadStoreRequest::QueueFailed); - }; - let mut managed_envelope = ManagedEnvelope::new( envelope, state.outcome_aggregator().clone(), diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 0770449bd3..26561c2d7d 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -254,7 +254,11 @@ impl ServiceState { upstream_relay.clone(), global_config.clone(), ); - let envelope_buffer = GuardedEnvelopeBuffer::from_config(&config).map(Arc::new); + let envelope_buffer = GuardedEnvelopeBuffer::from_config( + &config, + MemoryChecker::new(memory_stat.clone(), config.clone()), + ) + .map(Arc::new); ProjectCacheService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index ebe30bc76f..1a3263ca24 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -8,11 +8,13 @@ use relay_config::Config; use crate::envelope::Envelope; use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; -use crate::services::buffer::envelope_stack::{EnvelopeStack, StackProvider}; -use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError; -use crate::services::buffer::stack_provider::memory::MemoryStackProvider; -use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; +use crate::services::buffer::envelope_stack::EnvelopeStack; +use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; +use crate::services::buffer::stacks_manager::memory::MemoryStacksManager; +use crate::services::buffer::stacks_manager::sqlite::SqliteStacksManager; +use crate::services::buffer::stacks_manager::{Capacity, StacksManager}; use crate::statsd::{RelayCounters, RelayGauges}; +use crate::utils::MemoryChecker; /// Polymorphic envelope buffering interface. /// @@ -25,20 +27,21 @@ use crate::statsd::{RelayCounters, RelayGauges}; #[allow(private_interfaces)] pub enum PolymorphicEnvelopeBuffer { /// An enveloper buffer that uses in-memory envelopes stacks. - InMemory(EnvelopeBuffer), + InMemory(EnvelopeBuffer), /// An enveloper buffer that uses sqlite envelopes stacks. #[allow(dead_code)] - Sqlite(EnvelopeBuffer), + Sqlite(EnvelopeBuffer), } impl PolymorphicEnvelopeBuffer { /// Creates either a memory-based or a disk-based envelope buffer, /// depending on the given configuration. - pub fn from_config(config: &Config) -> Self { + pub fn from_config(config: &Config, memory_checker: MemoryChecker) -> Self { if config.spool_envelopes_path().is_some() { panic!("Disk backend not yet supported for spool V2"); } - Self::InMemory(EnvelopeBuffer::::new()) + + Self::InMemory(EnvelopeBuffer::::new(memory_checker)) } /// Adds an envelope to the buffer. @@ -78,6 +81,14 @@ impl PolymorphicEnvelopeBuffer { Self::InMemory(buffer) => buffer.mark_ready(project, is_ready), } } + + /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s. + pub fn has_capacity(&self) -> bool { + match self { + Self::Sqlite(buffer) => buffer.has_capacity(), + Self::InMemory(buffer) => buffer.has_capacity(), + } + } } /// Error that occurs while interacting with the envelope buffer. @@ -86,6 +97,9 @@ pub enum EnvelopeBufferError { #[error("sqlite")] Sqlite(#[from] SqliteEnvelopeStackError), + #[error("failed to push envelope to the buffer")] + PushFailed, + #[error("impossible")] Impossible(#[from] Infallible), } @@ -95,7 +109,7 @@ pub enum EnvelopeBufferError { /// Envelope stacks are organized in a priority queue, and are reprioritized every time an envelope /// is pushed, popped, or when a project becomes ready. #[derive(Debug)] -struct EnvelopeBuffer { +struct EnvelopeBuffer { /// The central priority queue. priority_queue: priority_queue::PriorityQueue, Priority>, /// A lookup table to find all stacks involving a project. @@ -107,32 +121,32 @@ struct EnvelopeBuffer { stack_provider: P, } -impl EnvelopeBuffer { - /// Creates an empty buffer. - pub fn new() -> Self { +impl EnvelopeBuffer { + /// Creates an empty memory-based buffer. + pub fn new(memory_checker: MemoryChecker) -> Self { Self { stacks_by_project: Default::default(), priority_queue: Default::default(), - stack_provider: MemoryStackProvider, + stack_provider: MemoryStacksManager::new(memory_checker), } } } #[allow(dead_code)] -impl EnvelopeBuffer { - /// Creates an empty buffer. +impl EnvelopeBuffer { + /// Creates an empty sqlite-based buffer. pub async fn new(config: &Config) -> Result { Ok(Self { stacks_by_project: Default::default(), priority_queue: Default::default(), - stack_provider: SqliteStackProvider::new(config).await?, + stack_provider: SqliteStacksManager::new(config).await?, }) } } -impl EnvelopeBuffer

+impl EnvelopeBuffer

where - EnvelopeBufferError: std::convert::From<::Error>, + EnvelopeBufferError: From<::Error>, { /// Pushes an envelope to the appropriate envelope stack and reprioritizes the stack. /// @@ -255,6 +269,10 @@ where ); } + pub fn has_capacity(&self) -> bool { + matches!(self.stack_provider.capacity(), Capacity::FREE); + } + fn pop_stack(&mut self, stack_key: StackKey) { for project_key in stack_key.iter() { self.stacks_by_project @@ -425,7 +443,7 @@ mod tests { #[tokio::test] async fn insert_pop() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); @@ -500,7 +518,7 @@ mod tests { #[tokio::test] async fn project_internal_order() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -527,7 +545,7 @@ mod tests { #[tokio::test] async fn sampling_projects() { - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); @@ -605,7 +623,7 @@ mod tests { assert_ne!(stack_key1, stack_key2); - let mut buffer = EnvelopeBuffer::::new(); + let mut buffer = EnvelopeBuffer::::new(); buffer .push(new_envelope(project_key1, Some(project_key2))) .await diff --git a/relay-server/src/services/buffer/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs index ee48016f09..8e48f7391f 100644 --- a/relay-server/src/services/buffer/envelope_stack/mod.rs +++ b/relay-server/src/services/buffer/envelope_stack/mod.rs @@ -20,9 +20,3 @@ pub trait EnvelopeStack: Send + std::fmt::Debug { /// Pops the [`Envelope`] on top of the stack. fn pop(&mut self) -> impl Future>, Self::Error>>; } - -pub trait StackProvider: std::fmt::Debug { - type Stack: EnvelopeStack; - - fn create_stack(&self, envelope: Box) -> Self::Stack; -} diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 9ea8d16a66..3568728061 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -6,9 +6,10 @@ use relay_base_schema::project::ProjectKey; use crate::envelope::Envelope; use crate::services::buffer::envelope_stack::EnvelopeStack; -use crate::services::buffer::sqlite_envelope_store::{ +use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; +use crate::services::buffer::envelope_store::EnvelopeStore; use crate::statsd::RelayCounters; /// An error returned when doing an operation on [`SqliteEnvelopeStack`]. diff --git a/relay-server/src/services/buffer/envelope_store/mod.rs b/relay-server/src/services/buffer/envelope_store/mod.rs new file mode 100644 index 0000000000..9b20435544 --- /dev/null +++ b/relay-server/src/services/buffer/envelope_store/mod.rs @@ -0,0 +1,39 @@ +use std::future::Future; + +use hashbrown::HashSet; +use relay_base_schema::project::ProjectKey; + +use crate::Envelope; + +pub mod sqlite; + +/// Trait that models a store of [`Envelope`]s. +pub trait EnvelopeStore { + type Envelope; + + type Error; + + /// Inserts one or more envelopes into the store. + fn insert_many( + &mut self, + envelopes: impl IntoIterator, + ) -> impl Future>; + + /// Deletes one or more envelopes that match `own_key` and `sampling_key` up to `limit` from + /// the store. + fn delete_many( + &mut self, + own_key: ProjectKey, + sampling_key: ProjectKey, + limit: i64, + ) -> impl Future>, Self::Error>>; + + /// Returns a set of project key pairs, representing all the unique combinations of + /// `own_key` and `project_key` that are found in the store. + fn project_key_pairs( + &self, + ) -> impl Future, Self::Error>>; + + /// Returns the usage of the store where the definition of usage depends on the implementation. + fn usage(&self) -> usize; +} diff --git a/relay-server/src/services/buffer/sqlite_envelope_store.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs similarity index 88% rename from relay-server/src/services/buffer/sqlite_envelope_store.rs rename to relay-server/src/services/buffer/envelope_store/sqlite.rs index 83616bf55a..7d69993894 100644 --- a/relay-server/src/services/buffer/sqlite_envelope_store.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -1,7 +1,3 @@ -use std::error::Error; -use std::path::Path; -use std::pin::pin; - use futures::stream::StreamExt; use hashbrown::HashSet; use relay_base_schema::project::{ParseProjectKeyError, ProjectKey}; @@ -13,10 +9,16 @@ use sqlx::sqlite::{ SqliteRow, SqliteSynchronous, }; use sqlx::{Pool, QueryBuilder, Row, Sqlite}; +use std::error::Error; +use std::path::Path; +use std::pin::pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use tokio::fs::DirBuilder; use crate::envelope::EnvelopeError; use crate::extractors::StartTime; +use crate::services::buffer::envelope_store::EnvelopeStore; use crate::Envelope; /// Struct that contains all the fields of an [`Envelope`] that are mapped to the database columns. @@ -95,12 +97,16 @@ pub enum SqliteEnvelopeStoreError { #[derive(Debug, Clone)] pub struct SqliteEnvelopeStore { db: Pool, + last_known_usage: Arc, } impl SqliteEnvelopeStore { /// Initializes the [`SqliteEnvelopeStore`] with a supplied [`Pool`]. pub fn new(db: Pool) -> Self { - Self { db } + Self { + db, + last_known_usage: Arc::new(AtomicUsize::new(0)), + } } /// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing @@ -132,7 +138,7 @@ impl SqliteEnvelopeStore { .synchronous(SqliteSynchronous::Normal) // The freelist pages are moved to the end of the database file and the database file is truncated to remove the freelist pages at every // transaction commit. Note, however, that auto-vacuum only truncates the freelist pages from the file. - // Auto-vacuum does not defragment the database nor repack individual database pages the way that the VACUUM command does. + // Auto-vacuum does not de-fragment the database nor repack individual database pages the way that the VACUUM command does. // // This will help us to keep the file size under some control. .auto_vacuum(SqliteAutoVacuum::Full) @@ -148,7 +154,14 @@ impl SqliteEnvelopeStore { .await .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?; - Ok(SqliteEnvelopeStore { db }) + // We estimate the usage of the database during preparation, so that we have a first + // estimate. + let usage = Self::estimate_usage(&db).await?; + + Ok(SqliteEnvelopeStore { + db, + last_known_usage: Arc::new(AtomicUsize::new(usage as usize)), + }) } /// Set up the database and return the current number of envelopes. @@ -194,11 +207,26 @@ impl SqliteEnvelopeStore { Ok(()) } + /// Estimates the size of the disk. + async fn estimate_usage(db: &Pool) -> Result { + build_estimate_size() + .fetch_one(db) + .await + .and_then(|r| r.try_get(0)) + .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed) + } +} + +impl EnvelopeStore for SqliteEnvelopeStore { + type Envelope = InsertEnvelope; + + type Error = SqliteEnvelopeStoreError; + /// Inserts one or more envelopes into the database. - pub async fn insert_many( - &self, - envelopes: impl IntoIterator, - ) -> Result<(), SqliteEnvelopeStoreError> { + async fn insert_many( + &mut self, + envelopes: impl IntoIterator, + ) -> Result<(), Self::Error> { if let Err(err) = build_insert_many_envelopes(envelopes.into_iter()) .build() .execute(&self.db) @@ -216,12 +244,12 @@ impl SqliteEnvelopeStore { } /// Deletes and returns at most `limit` [`Envelope`]s from the database. - pub async fn delete_many( - &self, + async fn delete_many( + &mut self, own_key: ProjectKey, sampling_key: ProjectKey, limit: i64, - ) -> Result>, SqliteEnvelopeStoreError> { + ) -> Result>, Self::Error> { let envelopes = build_delete_and_fetch_many_envelopes(own_key, sampling_key, limit) .fetch(&self.db) .peekable(); @@ -279,9 +307,7 @@ impl SqliteEnvelopeStore { /// Returns a set of project key pairs, representing all the unique combinations of /// `own_key` and `project_key` that are found in the database. - pub async fn project_key_pairs( - &self, - ) -> Result, SqliteEnvelopeStoreError> { + async fn project_key_pairs(&self) -> Result, Self::Error> { let project_key_pairs = build_get_project_key_pairs() .fetch_all(&self.db) .await @@ -296,13 +322,32 @@ impl SqliteEnvelopeStore { Ok(project_key_pairs) } - /// Returns an approximate measure of the size of the database. - pub async fn used_size(&self) -> Result { - build_estimate_size() - .fetch_one(&self.db) - .await - .and_then(|r| r.try_get(0)) - .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed) + /// Returns an approximate measure of the used size of the database. + fn usage(&self) -> usize { + let db = self.db.clone(); + let last_known_usage = self.last_known_usage.clone(); + tokio::spawn(async move { + let usage = Self::estimate_usage(&db).await; + let Ok(usage) = usage else { + relay_log::error!("failed to update the disk usage"); + return; + }; + + let current = last_known_usage.load(Ordering::Relaxed); + if last_known_usage + .compare_exchange_weak( + current, + usage as usize, + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_err() + { + relay_log::error!("failed to update the disk usage"); + } + }); + + self.last_known_usage.load(Ordering::Relaxed) } } @@ -464,7 +509,7 @@ mod tests { #[tokio::test] async fn test_insert_and_delete_envelopes() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db); + let mut envelope_store = SqliteEnvelopeStore::new(db); let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); @@ -492,7 +537,7 @@ mod tests { #[tokio::test] async fn test_insert_and_get_project_keys_pairs() { let db = setup_db(true).await; - let envelope_store = SqliteEnvelopeStore::new(db); + let mut envelope_store = SqliteEnvelopeStore::new(db); let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index d5a862136e..07e84b449a 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -8,18 +8,18 @@ use relay_config::Config; use tokio::sync::MutexGuard; use crate::envelope::Envelope; -use crate::utils::ManagedEnvelope; +use crate::utils::{ManagedEnvelope, MemoryChecker}; pub use envelope_buffer::EnvelopeBufferError; pub use envelope_buffer::PolymorphicEnvelopeBuffer; pub use envelope_stack::sqlite::SqliteEnvelopeStack; // pub for benchmarks pub use envelope_stack::EnvelopeStack; // pub for benchmarks -pub use sqlite_envelope_store::SqliteEnvelopeStore; // pub for benchmarks // pub for benchmarks +pub use envelope_store::sqlite::SqliteEnvelopeStore; // pub for benchmarks // pub for benchmarks mod envelope_buffer; mod envelope_stack; -mod sqlite_envelope_store; -mod stack_provider; +mod envelope_store; +mod stacks_manager; mod testutils; /// Async envelope buffering interface. @@ -51,11 +51,11 @@ impl GuardedEnvelopeBuffer { /// /// NOTE: until the V1 spooler implementation is removed, this function returns `None` /// if V2 spooling is not configured. - pub fn from_config(config: &Config) -> Option { + pub fn from_config(config: &Config, memory_checker: MemoryChecker) -> Option { if config.spool_v2() { Some(Self { inner: tokio::sync::Mutex::new(Inner { - backend: PolymorphicEnvelopeBuffer::from_config(config), + backend: PolymorphicEnvelopeBuffer::from_config(config, memory_checker), changed: true, }), notify: tokio::sync::Notify::new(), @@ -137,6 +137,12 @@ impl GuardedEnvelopeBuffer { Ok(()) } + /// Returns `true` if the buffer has capacity to accept more [`Envelope`]s. + pub fn has_capacity(&self) -> bool { + let guard = self.inner.blocking_lock(); + guard.backend.has_capacity() + } + fn notify(&self, guard: &mut MutexGuard) { guard.changed = true; self.notify.notify_waiters(); diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs deleted file mode 100644 index b3fe5c3bb3..0000000000 --- a/relay-server/src/services/buffer/stack_provider/memory.rs +++ /dev/null @@ -1,14 +0,0 @@ -use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; -use crate::services::buffer::envelope_stack::StackProvider; -use crate::Envelope; - -#[derive(Debug)] -pub struct MemoryStackProvider; - -impl StackProvider for MemoryStackProvider { - type Stack = MemoryEnvelopeStack; - - fn create_stack(&self, envelope: Box) -> Self::Stack { - MemoryEnvelopeStack::new(envelope) - } -} diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs deleted file mode 100644 index ae663f641d..0000000000 --- a/relay-server/src/services/buffer/stack_provider/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod memory; -pub mod sqlite; diff --git a/relay-server/src/services/buffer/stacks_manager/memory.rs b/relay-server/src/services/buffer/stacks_manager/memory.rs new file mode 100644 index 0000000000..4941fc1fc5 --- /dev/null +++ b/relay-server/src/services/buffer/stacks_manager/memory.rs @@ -0,0 +1,31 @@ +use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; +use crate::services::buffer::stacks_manager::{Capacity, StacksManager}; +use crate::utils::MemoryChecker; +use crate::Envelope; + +#[derive(Debug)] +pub struct MemoryStacksManager { + memory_checker: MemoryChecker, +} + +impl MemoryStacksManager { + pub fn new(memory_checker: MemoryChecker) -> Self { + Self { memory_checker } + } +} + +impl StacksManager for MemoryStacksManager { + type Stack = MemoryEnvelopeStack; + + fn create_stack(&self, envelope: Box) -> Self::Stack { + MemoryEnvelopeStack::new(envelope) + } + + fn capacity(&self) -> Capacity { + if self.memory_checker.check_memory().has_capacity() { + Capacity::FREE + } else { + Capacity::FULL + } + } +} diff --git a/relay-server/src/services/buffer/stacks_manager/mod.rs b/relay-server/src/services/buffer/stacks_manager/mod.rs new file mode 100644 index 0000000000..51f76b80a6 --- /dev/null +++ b/relay-server/src/services/buffer/stacks_manager/mod.rs @@ -0,0 +1,19 @@ +use crate::{Envelope, EnvelopeStack}; + +pub mod memory; +pub mod sqlite; + +/// Enum representing the current capacity of the [`StacksManager`] to accept new [`Envelope`]s. +pub enum Capacity { + FREE, + FULL, +} + +/// A provider of [`EnvelopeStack`] instances that is responsible for creating them. +pub trait StacksManager: std::fmt::Debug { + type Stack: EnvelopeStack; + + fn create_stack(&self, envelope: Box) -> Self::Stack; + + fn capacity(&self) -> Capacity; +} diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stacks_manager/sqlite.rs similarity index 65% rename from relay-server/src/services/buffer/stack_provider/sqlite.rs rename to relay-server/src/services/buffer/stacks_manager/sqlite.rs index 9716585606..6bf5e1d480 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stacks_manager/sqlite.rs @@ -1,32 +1,35 @@ use relay_config::Config; -use crate::services::buffer::envelope_stack::StackProvider; -use crate::services::buffer::sqlite_envelope_store::{ +use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; +use crate::services::buffer::envelope_store::EnvelopeStore; +use crate::services::buffer::stacks_manager::{Capacity, StacksManager}; use crate::{Envelope, SqliteEnvelopeStack}; #[derive(Debug)] -pub struct SqliteStackProvider { +pub struct SqliteStacksManager { envelope_store: SqliteEnvelopeStore, disk_batch_size: usize, max_batches: usize, + max_disk_size: usize, } #[warn(dead_code)] -impl SqliteStackProvider { - /// Creates a new [`SqliteStackProvider`] from the provided path to the SQLite database file. +impl SqliteStacksManager { + /// Creates a new [`SqliteStacksManager`] from the provided path to the SQLite database file. pub async fn new(config: &Config) -> Result { let envelope_store = SqliteEnvelopeStore::prepare(config).await?; Ok(Self { envelope_store, disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), max_batches: config.spool_envelopes_stack_max_batches(), + max_disk_size: config.spool_envelopes_max_disk_size(), }) } } -impl StackProvider for SqliteStackProvider { +impl StacksManager for SqliteStacksManager { type Stack = SqliteEnvelopeStack; fn create_stack(&self, envelope: Box) -> Self::Stack { @@ -41,4 +44,9 @@ impl StackProvider for SqliteStackProvider { sampling_key, ) } + + fn capacity(&self) -> Capacity { + // TODO: how to we make the check async or sync. + // self.envelope_store.usage() + } }