Skip to content

Commit

Permalink
Improve
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jul 30, 2024
1 parent af36538 commit 5f5228a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 13 deletions.
31 changes: 30 additions & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,16 @@ fn spool_envelopes_unspool_interval() -> u64 {
100
}

/// Default batch size for the stack.
fn spool_envelopes_stack_disk_batch_size() -> usize {
1000
}

/// Default maximum number of batches for the stack.
fn spool_envelopes_stack_max_batches() -> usize {
2
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand All @@ -868,7 +878,12 @@ pub struct EnvelopeSpool {
/// The interval in milliseconds to trigger unspool.
#[serde(default = "spool_envelopes_unspool_interval")]
unspool_interval: u64,
/// Version of the spooler
/// Number of elements of the envelope stack that are flushed to disk.
stack_disk_batch_size: usize,
/// Number of batches of size `stack_disk_batch_size` that need to be accumulated before
/// flushing one batch to disk.
stack_max_batches: usize,
/// Version of the spooler.
#[serde(default = "EnvelopeSpoolVersion::default")]
version: EnvelopeSpoolVersion,
}
Expand Down Expand Up @@ -896,6 +911,8 @@ impl Default for EnvelopeSpool {
max_disk_size: spool_envelopes_max_disk_size(),
max_memory_size: spool_envelopes_max_memory_size(),
unspool_interval: spool_envelopes_unspool_interval(), // 100ms
stack_disk_batch_size: spool_envelopes_stack_disk_batch_size(),
stack_max_batches: spool_envelopes_stack_max_batches(),
version: EnvelopeSpoolVersion::V2,
}
}
Expand Down Expand Up @@ -2095,6 +2112,18 @@ impl Config {
self.values.spool.envelopes.max_memory_size.as_bytes()
}

/// Number of batches of size `stack_disk_batch_size` that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_stack_disk_batch_size(&self) -> usize {
self.values.spool.envelopes.stack_disk_batch_size
}

/// Number of batches of size `stack_disk_batch_size` that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_stack_max_batches(&self) -> usize {
self.values.spool.envelopes.stack_max_batches
}

/// Returns `true` if version 2 of the spooling mechanism is used.
pub fn spool_v2(&self) -> bool {
matches!(
Expand Down
4 changes: 0 additions & 4 deletions relay-server/src/services/buffer/envelope_stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@ pub trait EnvelopeStack: Send + std::fmt::Debug {
fn push(&mut self, envelope: Box<Envelope>) -> impl Future<Output = Result<(), Self::Error>>;

/// Peeks the [`Envelope`] on top of the stack.
///
/// If the stack is empty, an error is returned.
fn peek(&mut self) -> impl Future<Output = Result<Option<&Envelope>, Self::Error>>;

/// Pops the [`Envelope`] on top of the stack.
///
/// If the stack is empty, an error is returned.
fn pop(&mut self) -> impl Future<Output = Result<Option<Box<Envelope>>, Self::Error>>;
}

Expand Down
1 change: 0 additions & 1 deletion relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::services::buffer::sqlite_envelope_store::{
/// An error returned when doing an operation on [`SQLiteEnvelopeStack`].
#[derive(Debug, thiserror::Error)]
pub enum SqliteEnvelopeStackError {
/// The envelope store encountered an error.
#[error("an error occurred in the envelope store: {0}")]
EnvelopeStoreError(#[from] SqliteEnvelopeStoreError),
}
Expand Down
11 changes: 6 additions & 5 deletions relay-server/src/services/buffer/sqlite_envelope_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use relay_config::Config;
use crate::extractors::StartTime;
use crate::Envelope;

/// Struct that contains all the fields of an [`Envelope`] that are mapped to the database columns.
pub struct InsertEnvelope {
received_at: i64,
own_key: ProjectKey,
Expand Down Expand Up @@ -49,22 +50,22 @@ pub enum SqliteEnvelopeStoreError {
#[error("failed to create the spool file: {0}")]
FileSetupError(std::io::Error),

#[error("an error occurred while writing to disk: {0}")]
#[error("failed to write to disk: {0}")]
WriteError(sqlx::Error),

#[error("an error occurred while reading from disk: {0}")]
#[error("failed to read from disk: {0}")]
FetchError(sqlx::Error),

#[error("no file path for the spool was provided")]
NoFilePath,

#[error("error during the migration of the database: {0}")]
#[error("failed to migrate the database: {0}")]
MigrationError(MigrateError),

#[error("error while extracting the envelope from the database")]
#[error("failed to extract the envelope from the database")]
EnvelopeExtractionError,

#[error("error while extracting the project key from the database")]
#[error("failed to extract a project key from the database")]
ProjectKeyExtractionError(#[from] ParseProjectKeyError),

#[error("failed to get database file size: {0}")]
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/buffer/stack_provider/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ impl SqliteStackProvider {
let envelope_store = SqliteEnvelopeStore::prepare(config).await?;
Ok(Self {
envelope_store,
disk_batch_size: 100, // TODO: put in config
max_batches: 2, // TODO: put in config
disk_batch_size: config.spool_envelopes_stack_disk_batch_size(),
max_batches: config.spool_envelopes_stack_max_batches(),
})
}
}
Expand Down

0 comments on commit 5f5228a

Please sign in to comment.