Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jul 25, 2024
1 parent 8cd1b6f commit 1058237
Showing 1 changed file with 35 additions and 21 deletions.
56 changes: 35 additions & 21 deletions relay-server/src/services/spooler/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum SQLiteEnvelopeStackError {
pub struct SQLiteEnvelopeStack {
db: Pool<Sqlite>,
spool_threshold: NonZeroUsize,
disk_batch_size: NonZeroUsize,
batch_size: NonZeroUsize,
own_key: ProjectKey,
sampling_key: ProjectKey,
#[allow(clippy::vec_box)]
Expand All @@ -44,19 +44,20 @@ impl SQLiteEnvelopeStack {
#[allow(dead_code)]
pub fn new(
db: Pool<Sqlite>,
spool_threshold: usize,
disk_batch_size: usize,
max_batches: usize,
own_key: ProjectKey,
sampling_key: ProjectKey,
) -> Self {
Self {
db,
spool_threshold: NonZeroUsize::new(spool_threshold)
spool_threshold: NonZeroUsize::new(disk_batch_size * max_batches)
.expect("the spool threshold must be > 0"),
disk_batch_size: NonZeroUsize::new(spool_threshold.div_ceil(2))
batch_size: NonZeroUsize::new(disk_batch_size)
.expect("the disk batch size must be > 0"),
own_key,
sampling_key,
batches_buffer: VecDeque::with_capacity(spool_threshold),
batches_buffer: VecDeque::with_capacity(max_batches),
batches_buffer_size: 0,
}
}
Expand All @@ -66,11 +67,12 @@ impl SQLiteEnvelopeStack {
#[allow(dead_code)]
pub async fn prepare(
db: Pool<Sqlite>,
spool_threshold: usize,
disk_batch_size: usize,
max_batches: usize,
own_key: ProjectKey,
sampling_key: ProjectKey,
) -> Result<(), SQLiteEnvelopeStackError> {
let mut stack = Self::new(db, spool_threshold, own_key, sampling_key);
let mut stack = Self::new(db, disk_batch_size, max_batches, own_key, sampling_key);
stack.unspool_from_disk().await?;

Ok(())
Expand Down Expand Up @@ -136,7 +138,7 @@ impl SQLiteEnvelopeStack {
let envelopes = build_delete_and_fetch_many_envelopes(
self.own_key,
self.sampling_key,
self.disk_batch_size.get() as i64,
self.batch_size.get() as i64,
)
.fetch(&self.db)
.peekable();
Expand All @@ -149,7 +151,7 @@ impl SQLiteEnvelopeStack {
// We use a sorted vector to order envelopes that are deleted from the database.
// Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't
// return deleted rows in a specific order.
let mut extracted_envelopes = Vec::with_capacity(self.disk_batch_size.get());
let mut extracted_envelopes = Vec::with_capacity(self.batch_size.get());
let mut db_error = None;
while let Some(envelope) = envelopes.as_mut().next().await {
let envelope = match envelope {
Expand Down Expand Up @@ -229,11 +231,13 @@ impl EnvelopeStack for SQLiteEnvelopeStack {

// We need to check if the topmost batch has space, if not we have to create a new batch and
// push it in front.
if self.batches_buffer.front().map_or(true, |last_batch| {
last_batch.len() >= self.disk_batch_size.get()
}) {
if self
.batches_buffer
.front()
.map_or(true, |last_batch| last_batch.len() >= self.batch_size.get())
{
self.batches_buffer
.push_front(Vec::with_capacity(self.disk_batch_size.get()));
.push_front(Vec::with_capacity(self.batch_size.get()));
}

if let Some(batch) = self.batches_buffer.front_mut() {
Expand Down Expand Up @@ -423,14 +427,15 @@ mod tests {
let db = setup_db(false).await;
let mut stack = SQLiteEnvelopeStack::new(
db,
3,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
);

let envelopes = mock_envelopes(3);
let envelopes = mock_envelopes(4);

// We push the 3 envelopes without errors because they are below the threshold.
// We push the 4 envelopes without errors because they are below the threshold.
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
Expand All @@ -447,17 +452,22 @@ mod tests {
// we will end up with 2.
let envelope = mock_envelope(Instant::now());
assert!(stack.push(envelope.clone()).await.is_ok());
assert_eq!(stack.batches_buffer_size, 2);
assert_eq!(stack.batches_buffer_size, 3);

// We pop the remaining elements, expecting the last added envelope to be on top.
let popped_envelope_1 = stack.pop().await.unwrap();
let popped_envelope_2 = stack.pop().await.unwrap();
let popped_envelope_3 = stack.pop().await.unwrap();
assert_eq!(
popped_envelope_1.event_id().unwrap(),
envelope.event_id().unwrap()
);
assert_eq!(
popped_envelope_2.event_id().unwrap(),
envelopes.clone()[3].event_id().unwrap()
);
assert_eq!(
popped_envelope_3.event_id().unwrap(),
envelopes.clone()[2].event_id().unwrap()
);
assert_eq!(stack.batches_buffer_size, 0);
Expand All @@ -468,7 +478,8 @@ mod tests {
let db = setup_db(false).await;
let mut stack = SQLiteEnvelopeStack::new(
db,
3,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
);
Expand All @@ -485,7 +496,8 @@ mod tests {
let db = setup_db(true).await;
let mut stack = SQLiteEnvelopeStack::new(
db,
3,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
);
Expand All @@ -502,7 +514,8 @@ mod tests {
let db = setup_db(true).await;
let mut stack = SQLiteEnvelopeStack::new(
db,
10,
5,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
);
Expand Down Expand Up @@ -537,7 +550,8 @@ mod tests {
let db = setup_db(true).await;
let mut stack = SQLiteEnvelopeStack::new(
db,
10,
5,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
);
Expand Down

0 comments on commit 1058237

Please sign in to comment.