From 5f5228aba085b715ff45ca63e2839aafa36588f9 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 30 Jul 2024 11:42:31 +0200 Subject: [PATCH] Improve --- relay-config/src/config.rs | 31 ++++++++++++++++++- .../src/services/buffer/envelope_stack/mod.rs | 4 --- .../services/buffer/envelope_stack/sqlite.rs | 1 - .../services/buffer/sqlite_envelope_store.rs | 11 ++++--- .../services/buffer/stack_provider/sqlite.rs | 4 +-- 5 files changed, 38 insertions(+), 13 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index cda759a33c..81f3e754dc 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -842,6 +842,16 @@ fn spool_envelopes_unspool_interval() -> u64 { 100 } +/// Default batch size for the stack. +fn spool_envelopes_stack_disk_batch_size() -> usize { + 1000 +} + +/// Default maximum number of batches for the stack. +fn spool_envelopes_stack_max_batches() -> usize { + 2 +} + /// Persistent buffering configuration for incoming envelopes. #[derive(Debug, Serialize, Deserialize)] pub struct EnvelopeSpool { @@ -868,7 +878,12 @@ pub struct EnvelopeSpool { /// The interval in milliseconds to trigger unspool. #[serde(default = "spool_envelopes_unspool_interval")] unspool_interval: u64, - /// Version of the spooler + /// Number of elements of the envelope stack that are flushed to disk. + stack_disk_batch_size: usize, + /// Number of batches of size `stack_disk_batch_size` that need to be accumulated before + /// flushing one batch to disk. + stack_max_batches: usize, + /// Version of the spooler. #[serde(default = "EnvelopeSpoolVersion::default")] version: EnvelopeSpoolVersion, } @@ -896,6 +911,8 @@ impl Default for EnvelopeSpool { max_disk_size: spool_envelopes_max_disk_size(), max_memory_size: spool_envelopes_max_memory_size(), unspool_interval: spool_envelopes_unspool_interval(), // 100ms + stack_disk_batch_size: spool_envelopes_stack_disk_batch_size(), + stack_max_batches: spool_envelopes_stack_max_batches(), version: EnvelopeSpoolVersion::V2, } } @@ -2095,6 +2112,18 @@ impl Config { self.values.spool.envelopes.max_memory_size.as_bytes() } + /// Number of batches of size `stack_disk_batch_size` that need to be accumulated before + /// flushing one batch to disk. + pub fn spool_envelopes_stack_disk_batch_size(&self) -> usize { + self.values.spool.envelopes.stack_disk_batch_size + } + + /// Number of batches of size `stack_disk_batch_size` that need to be accumulated before + /// flushing one batch to disk. + pub fn spool_envelopes_stack_max_batches(&self) -> usize { + self.values.spool.envelopes.stack_max_batches + } + /// Returns `true` if version 2 of the spooling mechanism is used. pub fn spool_v2(&self) -> bool { matches!( diff --git a/relay-server/src/services/buffer/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs index 1db4d3c82a..ee48016f09 100644 --- a/relay-server/src/services/buffer/envelope_stack/mod.rs +++ b/relay-server/src/services/buffer/envelope_stack/mod.rs @@ -15,13 +15,9 @@ pub trait EnvelopeStack: Send + std::fmt::Debug { fn push(&mut self, envelope: Box) -> impl Future>; /// Peeks the [`Envelope`] on top of the stack. - /// - /// If the stack is empty, an error is returned. fn peek(&mut self) -> impl Future, Self::Error>>; /// Pops the [`Envelope`] on top of the stack. - /// - /// If the stack is empty, an error is returned. fn pop(&mut self) -> impl Future>, Self::Error>>; } diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 4fd4cb99d9..1b57a0e580 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -13,7 +13,6 @@ use crate::services::buffer::sqlite_envelope_store::{ /// An error returned when doing an operation on [`SQLiteEnvelopeStack`]. #[derive(Debug, thiserror::Error)] pub enum SqliteEnvelopeStackError { - /// The envelope store encountered an error. #[error("an error occurred in the envelope store: {0}")] EnvelopeStoreError(#[from] SqliteEnvelopeStoreError), } diff --git a/relay-server/src/services/buffer/sqlite_envelope_store.rs b/relay-server/src/services/buffer/sqlite_envelope_store.rs index e25df0adb7..af1b48d036 100644 --- a/relay-server/src/services/buffer/sqlite_envelope_store.rs +++ b/relay-server/src/services/buffer/sqlite_envelope_store.rs @@ -19,6 +19,7 @@ use relay_config::Config; use crate::extractors::StartTime; use crate::Envelope; +/// Struct that contains all the fields of an [`Envelope`] that are mapped to the database columns. pub struct InsertEnvelope { received_at: i64, own_key: ProjectKey, @@ -49,22 +50,22 @@ pub enum SqliteEnvelopeStoreError { #[error("failed to create the spool file: {0}")] FileSetupError(std::io::Error), - #[error("an error occurred while writing to disk: {0}")] + #[error("failed to write to disk: {0}")] WriteError(sqlx::Error), - #[error("an error occurred while reading from disk: {0}")] + #[error("failed to read from disk: {0}")] FetchError(sqlx::Error), #[error("no file path for the spool was provided")] NoFilePath, - #[error("error during the migration of the database: {0}")] + #[error("failed to migrate the database: {0}")] MigrationError(MigrateError), - #[error("error while extracting the envelope from the database")] + #[error("failed to extract the envelope from the database")] EnvelopeExtractionError, - #[error("error while extracting the project key from the database")] + #[error("failed to extract a project key from the database")] ProjectKeyExtractionError(#[from] ParseProjectKeyError), #[error("failed to get database file size: {0}")] diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 9ba7d4ddee..895f9f279d 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -19,8 +19,8 @@ impl SqliteStackProvider { let envelope_store = SqliteEnvelopeStore::prepare(config).await?; Ok(Self { envelope_store, - disk_batch_size: 100, // TODO: put in config - max_batches: 2, // TODO: put in config + disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), + max_batches: config.spool_envelopes_stack_max_batches(), }) } }