diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index be767972b4..7eee384e20 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -31,7 +31,7 @@ pub enum SQLiteEnvelopeStackError { pub struct SQLiteEnvelopeStack { db: Pool, spool_threshold: NonZeroUsize, - disk_batch_size: NonZeroUsize, + batch_size: NonZeroUsize, own_key: ProjectKey, sampling_key: ProjectKey, #[allow(clippy::vec_box)] @@ -44,19 +44,20 @@ impl SQLiteEnvelopeStack { #[allow(dead_code)] pub fn new( db: Pool, - spool_threshold: usize, + disk_batch_size: usize, + max_batches: usize, own_key: ProjectKey, sampling_key: ProjectKey, ) -> Self { Self { db, - spool_threshold: NonZeroUsize::new(spool_threshold) + spool_threshold: NonZeroUsize::new(disk_batch_size * max_batches) .expect("the spool threshold must be > 0"), - disk_batch_size: NonZeroUsize::new(spool_threshold.div_ceil(2)) + batch_size: NonZeroUsize::new(disk_batch_size) .expect("the disk batch size must be > 0"), own_key, sampling_key, - batches_buffer: VecDeque::with_capacity(spool_threshold), + batches_buffer: VecDeque::with_capacity(max_batches), batches_buffer_size: 0, } } @@ -66,11 +67,12 @@ impl SQLiteEnvelopeStack { #[allow(dead_code)] pub async fn prepare( db: Pool, - spool_threshold: usize, + disk_batch_size: usize, + max_batches: usize, own_key: ProjectKey, sampling_key: ProjectKey, ) -> Result<(), SQLiteEnvelopeStackError> { - let mut stack = Self::new(db, spool_threshold, own_key, sampling_key); + let mut stack = Self::new(db, disk_batch_size, max_batches, own_key, sampling_key); stack.unspool_from_disk().await?; Ok(()) @@ -136,7 +138,7 @@ impl SQLiteEnvelopeStack { let envelopes = build_delete_and_fetch_many_envelopes( self.own_key, self.sampling_key, - self.disk_batch_size.get() as i64, + self.batch_size.get() as i64, ) .fetch(&self.db) .peekable(); @@ -149,7 +151,7 @@ impl SQLiteEnvelopeStack { // We use a sorted vector to order envelopes that are deleted from the database. // Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't // return deleted rows in a specific order. - let mut extracted_envelopes = Vec::with_capacity(self.disk_batch_size.get()); + let mut extracted_envelopes = Vec::with_capacity(self.batch_size.get()); let mut db_error = None; while let Some(envelope) = envelopes.as_mut().next().await { let envelope = match envelope { @@ -229,11 +231,13 @@ impl EnvelopeStack for SQLiteEnvelopeStack { // We need to check if the topmost batch has space, if not we have to create a new batch and // push it in front. - if self.batches_buffer.front().map_or(true, |last_batch| { - last_batch.len() >= self.disk_batch_size.get() - }) { + if self + .batches_buffer + .front() + .map_or(true, |last_batch| last_batch.len() >= self.batch_size.get()) + { self.batches_buffer - .push_front(Vec::with_capacity(self.disk_batch_size.get())); + .push_front(Vec::with_capacity(self.batch_size.get())); } if let Some(batch) = self.batches_buffer.front_mut() { @@ -423,14 +427,15 @@ mod tests { let db = setup_db(false).await; let mut stack = SQLiteEnvelopeStack::new( db, - 3, + 2, + 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); - let envelopes = mock_envelopes(3); + let envelopes = mock_envelopes(4); - // We push the 3 envelopes without errors because they are below the threshold. + // We push the 4 envelopes without errors because they are below the threshold. for envelope in envelopes.clone() { assert!(stack.push(envelope).await.is_ok()); } @@ -447,17 +452,22 @@ mod tests { // we will end up with 2. let envelope = mock_envelope(Instant::now()); assert!(stack.push(envelope.clone()).await.is_ok()); - assert_eq!(stack.batches_buffer_size, 2); + assert_eq!(stack.batches_buffer_size, 3); // We pop the remaining elements, expecting the last added envelope to be on top. let popped_envelope_1 = stack.pop().await.unwrap(); let popped_envelope_2 = stack.pop().await.unwrap(); + let popped_envelope_3 = stack.pop().await.unwrap(); assert_eq!( popped_envelope_1.event_id().unwrap(), envelope.event_id().unwrap() ); assert_eq!( popped_envelope_2.event_id().unwrap(), + envelopes.clone()[3].event_id().unwrap() + ); + assert_eq!( + popped_envelope_3.event_id().unwrap(), envelopes.clone()[2].event_id().unwrap() ); assert_eq!(stack.batches_buffer_size, 0); @@ -468,7 +478,8 @@ mod tests { let db = setup_db(false).await; let mut stack = SQLiteEnvelopeStack::new( db, - 3, + 2, + 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -485,7 +496,8 @@ mod tests { let db = setup_db(true).await; let mut stack = SQLiteEnvelopeStack::new( db, - 3, + 2, + 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -502,7 +514,8 @@ mod tests { let db = setup_db(true).await; let mut stack = SQLiteEnvelopeStack::new( db, - 10, + 5, + 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -537,7 +550,8 @@ mod tests { let db = setup_db(true).await; let mut stack = SQLiteEnvelopeStack::new( db, - 10, + 5, + 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), );