From 265c5bb597f9fdf33b7e0a304cbb1056b52d6dc8 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 23 Jul 2024 16:14:58 +0200 Subject: [PATCH 01/23] feat(spooler): Add EnvelopeStack based on SQLite --- .../services/spooler/envelope_stack/mod.rs | 11 ++ .../services/spooler/envelope_stack/sqlite.rs | 109 ++++++++++++++++++ relay-server/src/services/spooler/mod.rs | 1 + 3 files changed, 121 insertions(+) create mode 100644 relay-server/src/services/spooler/envelope_stack/mod.rs create mode 100644 relay-server/src/services/spooler/envelope_stack/sqlite.rs diff --git a/relay-server/src/services/spooler/envelope_stack/mod.rs b/relay-server/src/services/spooler/envelope_stack/mod.rs new file mode 100644 index 0000000000..c8ed90dc4a --- /dev/null +++ b/relay-server/src/services/spooler/envelope_stack/mod.rs @@ -0,0 +1,11 @@ +use crate::envelope::Envelope; + +mod sqlite; + +pub trait EnvelopeStack { + async fn push(&mut self, envelope: Envelope); + + async fn peek(&self) -> Option<&Envelope>; + + async fn pop(&mut self) -> Option; +} diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs new file mode 100644 index 0000000000..25b027c1fc --- /dev/null +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -0,0 +1,109 @@ +use crate::envelope::Envelope; +use crate::services::spooler::envelope_stack::EnvelopeStack; +use relay_base_schema::project::ProjectKey; +use sqlx::query::Query; +use sqlx::sqlite::SqliteArguments; +use sqlx::{Pool, QueryBuilder, Sqlite}; +use std::collections::VecDeque; +// Threshold x +// When x is reached, we flush x / 2 oldest elements to disk +// When the stack is empty we load x / 2 from disk (to give a bit of leeway for incoming elements) +// The idea is that we want to keep the most fresh data in memory and flush it to disk only when +// a threshold is surpassed, we also don't want to flush all the data, since we want to keep the msot +// recent data in memory as much as possible. + +// TODO: +// add metrics to counter how many insertions we have vs how many reads we have per second + +pub struct SQLiteEnvelopeStack { + db: Pool, + spool_threshold: usize, + own_key: ProjectKey, + sampling_key: ProjectKey, + buffer: VecDeque, +} + +impl SQLiteEnvelopeStack { + pub fn new(db: Pool, spool_threshold: usize) -> Self { + Self { + db, + spool_threshold, + buffer: VecDeque::with_capacity(spool_threshold), + } + } + + fn hit_spool_threshold(&self) -> bool { + self.buffer.len() + 1 > self.spool_threshold + } + + async fn spool_to_disk(&mut self) { + // TODO: we can make a custom iterator to consume back elements until threshold to avoid + // allocating a vector. + let mut envelopes = Vec::with_capacity(self.spool_threshold / 2); + for _ in 0..(self.spool_threshold / 2) { + let Some(value) = self.buffer.pop_back() else { + break; + }; + + envelopes.push(value); + } + + let insert_envelopes = envelopes.iter().map(|e| InsertEnvelope { + received_at: received_at(e), + own_key: self.own_key, + sampling_key: self.sampling_key, + encoded_envelope: e.to_vec().unwrap(), + }); + let result = build_insert_many_envelopes(insert_envelopes) + .build() + .execute(&self.db) + .await; + } + + async fn load_from_disk(&mut self) {} +} + +impl EnvelopeStack for SQLiteEnvelopeStack { + async fn push(&mut self, envelope: Envelope) { + if self.hit_spool_threshold() { + self.spool_to_disk(); + } + + self.buffer.push_front(envelope); + } + + async fn peek(&self) -> Option<&Envelope> { + self.buffer.back() + } + + async fn pop(&mut self) -> Option { + todo!() + } +} + +struct InsertEnvelope { + received_at: i64, + own_key: ProjectKey, + sampling_key: ProjectKey, + encoded_envelope: Vec, +} + +fn build_insert_many_envelopes<'a>( + envelopes: impl Iterator, +) -> QueryBuilder<'a, Sqlite> { + let mut builder: QueryBuilder = + QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) "); + + builder.push_values(envelopes, |mut b, envelope| { + b.push_bind(envelope.received_at) + .push_bind(envelope.own_key.to_string()) + .push_bind(envelope.sampling_key.to_string()) + .push_bind(envelope.encoded_envelope); + }); + + builder +} + +fn received_at(envelope: &Envelope) -> i64 { + relay_common::time::instant_to_date_time(envelope.meta().start_time()).timestamp_millis() +} diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index bce1b33ad5..265e900e83 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -59,6 +59,7 @@ use crate::services::test_store::TestStore; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::{ManagedEnvelope, MemoryChecker}; +mod envelope_stack; pub mod spool_utils; mod sql; From 6df30e28e5d3bc05b187ed2507dba7ce613a3542 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Jul 2024 11:32:31 +0200 Subject: [PATCH 02/23] Add tests --- .../services/spooler/envelope_stack/mod.rs | 8 +- .../services/spooler/envelope_stack/sqlite.rs | 354 ++++++++++++++++-- 2 files changed, 334 insertions(+), 28 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/mod.rs b/relay-server/src/services/spooler/envelope_stack/mod.rs index c8ed90dc4a..27f28a7ae6 100644 --- a/relay-server/src/services/spooler/envelope_stack/mod.rs +++ b/relay-server/src/services/spooler/envelope_stack/mod.rs @@ -3,9 +3,11 @@ use crate::envelope::Envelope; mod sqlite; pub trait EnvelopeStack { - async fn push(&mut self, envelope: Envelope); + type Error; - async fn peek(&self) -> Option<&Envelope>; + async fn push(&mut self, envelope: Box) -> Result<(), Self::Error>; - async fn pop(&mut self) -> Option; + async fn peek(&mut self) -> Result<&Box, Self::Error>; + + async fn pop(&mut self) -> Result, Self::Error>; } diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index 25b027c1fc..e8d7a3ea6c 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -1,41 +1,76 @@ use crate::envelope::Envelope; +use crate::extractors::StartTime; use crate::services::spooler::envelope_stack::EnvelopeStack; +use futures::StreamExt; use relay_base_schema::project::ProjectKey; use sqlx::query::Query; -use sqlx::sqlite::SqliteArguments; -use sqlx::{Pool, QueryBuilder, Sqlite}; -use std::collections::VecDeque; -// Threshold x -// When x is reached, we flush x / 2 oldest elements to disk -// When the stack is empty we load x / 2 from disk (to give a bit of leeway for incoming elements) -// The idea is that we want to keep the most fresh data in memory and flush it to disk only when -// a threshold is surpassed, we also don't want to flush all the data, since we want to keep the msot -// recent data in memory as much as possible. - -// TODO: -// add metrics to counter how many insertions we have vs how many reads we have per second +use sqlx::sqlite::{SqliteArguments, SqliteRow}; +use sqlx::{Pool, QueryBuilder, Row, Sqlite}; +use std::cmp::Ordering; +use std::collections::{BinaryHeap, VecDeque}; +use std::error::Error; +use std::pin::pin; + +struct OrderedEnvelope(Box); + +impl Eq for OrderedEnvelope {} + +impl PartialEq for OrderedEnvelope { + fn eq(&self, other: &Self) -> bool { + matches!(self.cmp(other), Ordering::Equal) + } +} + +impl PartialOrd for OrderedEnvelope { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for OrderedEnvelope { + fn cmp(&self, other: &Self) -> Ordering { + received_at(&other.0).cmp(&received_at(&self.0)) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum SQLiteEnvelopeStackError { + #[error("the stack is empty")] + Empty, +} pub struct SQLiteEnvelopeStack { db: Pool, spool_threshold: usize, own_key: ProjectKey, sampling_key: ProjectKey, - buffer: VecDeque, + buffer: VecDeque>, } impl SQLiteEnvelopeStack { - pub fn new(db: Pool, spool_threshold: usize) -> Self { + pub fn new( + db: Pool, + spool_threshold: usize, + own_key: ProjectKey, + sampling_key: ProjectKey, + ) -> Self { Self { db, spool_threshold, + own_key, + sampling_key, buffer: VecDeque::with_capacity(spool_threshold), } } - fn hit_spool_threshold(&self) -> bool { + fn above_spool_threshold(&self) -> bool { self.buffer.len() + 1 > self.spool_threshold } + fn below_unspool_threshold(&self) -> bool { + self.buffer.is_empty() + } + async fn spool_to_disk(&mut self) { // TODO: we can make a custom iterator to consume back elements until threshold to avoid // allocating a vector. @@ -54,30 +89,115 @@ impl SQLiteEnvelopeStack { sampling_key: self.sampling_key, encoded_envelope: e.to_vec().unwrap(), }); - let result = build_insert_many_envelopes(insert_envelopes) + + if let Err(err) = build_insert_many_envelopes(insert_envelopes) .build() .execute(&self.db) - .await; + .await + { + relay_log::error!( + error = &err as &dyn Error, + "failed to spool envelopes to disk", + ); + } } - async fn load_from_disk(&mut self) {} + async fn load_from_disk(&mut self) { + let envelopes = build_delete_and_fetch_many_envelopes( + self.own_key, + self.sampling_key, + (self.spool_threshold / 2) as i64, + ) + .fetch(&self.db) + .peekable(); + + let mut envelopes = pin!(envelopes); + if envelopes.as_mut().peek().await.is_none() { + return; + } + + // We use a priority map to order envelopes that are deleted from the database. + // Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't + // return deleted rows in a specific order. + let mut ordered_envelopes = BinaryHeap::with_capacity(self.spool_threshold / 2); + while let Some(envelope) = envelopes.as_mut().next().await { + let envelope = match envelope { + Ok(envelope) => envelope, + Err(err) => { + relay_log::error!( + error = &err as &dyn Error, + "failed to unspool the envelopes from the disk", + ); + continue; + } + }; + + match self.extract_envelope(envelope) { + Ok(envelope) => { + ordered_envelopes.push(OrderedEnvelope(envelope)); + } + Err(err) => { + relay_log::error!( + error = &err as &dyn Error, + "failed to extract the envelope unspooled from disk", + ) + } + } + } + + for envelope in ordered_envelopes.into_sorted_vec() { + // We push in the back of the buffer, since we still want to give priority to + // incoming envelopes that have a more recent timestamp. + self.buffer.push_back(envelope.0) + } + } + + fn extract_envelope(&self, row: SqliteRow) -> Result, SQLiteEnvelopeStackError> { + let envelope_row: Vec = row + .try_get("envelope") + .map_err(|_| SQLiteEnvelopeStackError::Empty)?; + let envelope_bytes = bytes::Bytes::from(envelope_row); + let mut envelope = + Envelope::parse_bytes(envelope_bytes).map_err(|_| SQLiteEnvelopeStackError::Empty)?; + + let received_at: i64 = row + .try_get("received_at") + .map_err(|_| SQLiteEnvelopeStackError::Empty)?; + let start_time = StartTime::from_timestamp_millis(received_at as u64); + + envelope.set_start_time(start_time.into_inner()); + + Ok(envelope) + } } impl EnvelopeStack for SQLiteEnvelopeStack { - async fn push(&mut self, envelope: Envelope) { - if self.hit_spool_threshold() { - self.spool_to_disk(); + type Error = SQLiteEnvelopeStackError; + + async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { + if self.above_spool_threshold() { + self.spool_to_disk().await; } self.buffer.push_front(envelope); + + Ok(()) } - async fn peek(&self) -> Option<&Envelope> { - self.buffer.back() + async fn peek(&mut self) -> Result<&Box, Self::Error> { + if self.below_unspool_threshold() { + self.load_from_disk().await + } + + self.buffer.front().ok_or(Self::Error::Empty) } - async fn pop(&mut self) -> Option { - todo!() + async fn pop(&mut self) -> Result, Self::Error> { + if self.below_unspool_threshold() { + self.load_from_disk().await + } + + self.buffer.pop_front().ok_or(Self::Error::Empty) } } @@ -104,6 +224,190 @@ fn build_insert_many_envelopes<'a>( builder } +pub fn build_delete_and_fetch_many_envelopes<'a>( + own_key: ProjectKey, + project_key: ProjectKey, + batch_size: i64, +) -> Query<'a, Sqlite, SqliteArguments<'a>> { + sqlx::query( + "DELETE FROM + envelopes + WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? + ORDER BY received_at DESC LIMIT ?) + RETURNING + received_at, own_key, sampling_key, envelope", + ) + .bind(own_key.to_string()) + .bind(project_key.to_string()) + .bind(batch_size) +} + fn received_at(envelope: &Envelope) -> i64 { relay_common::time::instant_to_date_time(envelope.meta().start_time()).timestamp_millis() } + +#[cfg(test)] +mod tests { + use crate::envelope::{Envelope, Item, ItemType}; + use crate::extractors::RequestMeta; + use crate::services::spooler::envelope_stack::sqlite::SQLiteEnvelopeStack; + use crate::services::spooler::envelope_stack::EnvelopeStack; + use relay_base_schema::project::ProjectKey; + use relay_event_schema::protocol::EventId; + use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; + use sqlx::{Pool, Sqlite}; + use std::path::Path; + use std::time::{Duration, Instant}; + use tokio::fs::DirBuilder; + use uuid::Uuid; + + fn request_meta() -> RequestMeta { + let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" + .parse() + .unwrap(); + + RequestMeta::new(dsn) + } + + fn mock_envelopes(count: usize) -> Vec> { + let instant = Instant::now(); + (0..count) + .map(|i| { + let event_id = EventId::new(); + let mut envelope = Envelope::from_request(Some(event_id), request_meta()); + envelope.set_start_time(instant - Duration::from_secs(((count - i) * 3600) as u64)); + + let mut item = Item::new(ItemType::Attachment); + item.set_filename("item"); + envelope.add_item(item); + + envelope + }) + .collect() + } + + async fn setup() -> Pool { + let path = std::env::temp_dir().join(Uuid::new_v4().to_string()); + + create_spool_directory(&path).await; + + let options = SqliteConnectOptions::new() + .filename(&path) + .journal_mode(SqliteJournalMode::Wal) + .create_if_missing(true); + + let db = SqlitePoolOptions::new() + .connect_with(options) + .await + .unwrap(); + + sqlx::migrate!("../migrations").run(&db).await.unwrap(); + + db + } + + async fn create_spool_directory(path: &Path) { + let Some(parent) = path.parent() else { + return; + }; + + if !parent.as_os_str().is_empty() && !parent.exists() { + relay_log::debug!("creating directory for spooling file: {}", parent.display()); + DirBuilder::new() + .recursive(true) + .create(&parent) + .await + .unwrap(); + } + } + + #[tokio::test] + async fn test_push_below_threshold_and_pop() { + let db = setup().await; + let mut stack = SQLiteEnvelopeStack::new( + db, + 10, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + let envelopes = mock_envelopes(5); + + // We push 5 envelopes. + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + assert_eq!(stack.buffer.len(), 5); + + // We peek the top element. + let peeked_envelope = stack.peek().await.unwrap(); + assert_eq!( + peeked_envelope.event_id().unwrap(), + envelopes.clone()[4].event_id().unwrap() + ); + + // We pop 5 envelopes. + for envelope in envelopes.iter().rev() { + let popped_envelope = stack.pop().await.unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + } + + #[tokio::test] + async fn test_push_above_threshold_and_pop() { + let db = setup().await; + let mut stack = SQLiteEnvelopeStack::new( + db, + 10, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + let envelopes = mock_envelopes(15); + + // We push 15 envelopes. + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + assert_eq!(stack.buffer.len(), 10); + + // We peek the top element. + let peeked_envelope = stack.peek().await.unwrap(); + assert_eq!( + peeked_envelope.event_id().unwrap(), + envelopes.clone()[14].event_id().unwrap() + ); + + // We pop 10 envelopes, and we expect that the last 10 are in memory, since the first 5 + // should have been spooled to disk. + for envelope in envelopes[5..15].iter().rev() { + let popped_envelope = stack.pop().await.unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + assert!(stack.buffer.is_empty()); + + // We peek the top element, which since the buffer is empty should result in a disk load. + let peeked_envelope = stack.peek().await.unwrap(); + assert_eq!( + peeked_envelope.event_id().unwrap(), + envelopes.clone()[4].event_id().unwrap() + ); + + // We pop 5 envelopes, which should not result in a disk load since `peek()` already should + // have caused it. + for envelope in envelopes[0..5].iter().rev() { + let popped_envelope = stack.pop().await.unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + } + assert!(stack.buffer.is_empty()); + } +} From e43f61e3f62ad362b9bf901cfd5c7ca11cc5122f Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Jul 2024 11:34:37 +0200 Subject: [PATCH 03/23] Add tests --- relay-server/src/services/spooler/envelope_stack/mod.rs | 3 +++ relay-server/src/services/spooler/envelope_stack/sqlite.rs | 2 ++ 2 files changed, 5 insertions(+) diff --git a/relay-server/src/services/spooler/envelope_stack/mod.rs b/relay-server/src/services/spooler/envelope_stack/mod.rs index 27f28a7ae6..e7db697f16 100644 --- a/relay-server/src/services/spooler/envelope_stack/mod.rs +++ b/relay-server/src/services/spooler/envelope_stack/mod.rs @@ -5,9 +5,12 @@ mod sqlite; pub trait EnvelopeStack { type Error; + #[allow(dead_code)] async fn push(&mut self, envelope: Box) -> Result<(), Self::Error>; + #[allow(dead_code)] async fn peek(&mut self) -> Result<&Box, Self::Error>; + #[allow(dead_code)] async fn pop(&mut self) -> Result, Self::Error>; } diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index e8d7a3ea6c..104920a7df 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -48,6 +48,7 @@ pub struct SQLiteEnvelopeStack { } impl SQLiteEnvelopeStack { + #[allow(dead_code)] pub fn new( db: Pool, spool_threshold: usize, @@ -269,6 +270,7 @@ mod tests { RequestMeta::new(dsn) } + #[allow(clippy::vec_box)] fn mock_envelopes(count: usize) -> Vec> { let instant = Instant::now(); (0..count) From 2fe08af4213268b695c6b2aeb98ad7af5854038f Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Jul 2024 14:00:26 +0200 Subject: [PATCH 04/23] Fix --- .../services/spooler/envelope_stack/sqlite.rs | 148 +++++++++++++++--- 1 file changed, 122 insertions(+), 26 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index 104920a7df..b4e75a6ec7 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -37,6 +37,9 @@ impl Ord for OrderedEnvelope { pub enum SQLiteEnvelopeStackError { #[error("the stack is empty")] Empty, + + #[error("a database error occurred")] + DatabaseError(#[from] sqlx::Error), } pub struct SQLiteEnvelopeStack { @@ -64,6 +67,19 @@ impl SQLiteEnvelopeStack { } } + #[allow(dead_code)] + pub async fn prepare( + db: Pool, + spool_threshold: usize, + own_key: ProjectKey, + sampling_key: ProjectKey, + ) -> Result<(), SQLiteEnvelopeStackError> { + let mut stack = Self::new(db, spool_threshold, own_key, sampling_key); + stack.load_from_disk().await?; + + Ok(()) + } + fn above_spool_threshold(&self) -> bool { self.buffer.len() + 1 > self.spool_threshold } @@ -72,17 +88,28 @@ impl SQLiteEnvelopeStack { self.buffer.is_empty() } - async fn spool_to_disk(&mut self) { + fn disk_batch_size(&self) -> usize { + self.spool_threshold / 2 + } + + async fn spool_to_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> { + if self.disk_batch_size() == 0 { + return Ok(()); + } + // TODO: we can make a custom iterator to consume back elements until threshold to avoid // allocating a vector. - let mut envelopes = Vec::with_capacity(self.spool_threshold / 2); - for _ in 0..(self.spool_threshold / 2) { + let mut envelopes = Vec::with_capacity(self.disk_batch_size()); + for _ in 0..self.disk_batch_size() { let Some(value) = self.buffer.pop_back() else { break; }; envelopes.push(value); } + if envelopes.is_empty() { + return Ok(()); + } let insert_envelopes = envelopes.iter().map(|e| InsertEnvelope { received_at: received_at(e), @@ -100,27 +127,31 @@ impl SQLiteEnvelopeStack { error = &err as &dyn Error, "failed to spool envelopes to disk", ); + + return Err(SQLiteEnvelopeStackError::DatabaseError(err)); } + + Ok(()) } - async fn load_from_disk(&mut self) { + async fn load_from_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> { let envelopes = build_delete_and_fetch_many_envelopes( self.own_key, self.sampling_key, - (self.spool_threshold / 2) as i64, + self.disk_batch_size() as i64, ) .fetch(&self.db) .peekable(); let mut envelopes = pin!(envelopes); if envelopes.as_mut().peek().await.is_none() { - return; + return Ok(()); } // We use a priority map to order envelopes that are deleted from the database. // Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't // return deleted rows in a specific order. - let mut ordered_envelopes = BinaryHeap::with_capacity(self.spool_threshold / 2); + let mut ordered_envelopes = BinaryHeap::with_capacity(self.disk_batch_size()); while let Some(envelope) = envelopes.as_mut().next().await { let envelope = match envelope { Ok(envelope) => envelope, @@ -129,7 +160,10 @@ impl SQLiteEnvelopeStack { error = &err as &dyn Error, "failed to unspool the envelopes from the disk", ); - continue; + + // We early return under the assumption that the stream, if it contains an + // error, it means that the query failed. + return Err(SQLiteEnvelopeStackError::DatabaseError(err)); } }; @@ -151,6 +185,8 @@ impl SQLiteEnvelopeStack { // incoming envelopes that have a more recent timestamp. self.buffer.push_back(envelope.0) } + + Ok(()) } fn extract_envelope(&self, row: SqliteRow) -> Result, SQLiteEnvelopeStackError> { @@ -177,7 +213,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { if self.above_spool_threshold() { - self.spool_to_disk().await; + self.spool_to_disk().await?; } self.buffer.push_front(envelope); @@ -187,7 +223,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { async fn peek(&mut self) -> Result<&Box, Self::Error> { if self.below_unspool_threshold() { - self.load_from_disk().await + self.load_from_disk().await? } self.buffer.front().ok_or(Self::Error::Empty) @@ -195,7 +231,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { async fn pop(&mut self) -> Result, Self::Error> { if self.below_unspool_threshold() { - self.load_from_disk().await + self.load_from_disk().await? } self.buffer.pop_front().ok_or(Self::Error::Empty) @@ -270,25 +306,27 @@ mod tests { RequestMeta::new(dsn) } + fn mock_envelope(instant: Instant) -> Box { + let event_id = EventId::new(); + let mut envelope = Envelope::from_request(Some(event_id), request_meta()); + envelope.set_start_time(instant); + + let mut item = Item::new(ItemType::Attachment); + item.set_filename("item"); + envelope.add_item(item); + + envelope + } + #[allow(clippy::vec_box)] fn mock_envelopes(count: usize) -> Vec> { let instant = Instant::now(); (0..count) - .map(|i| { - let event_id = EventId::new(); - let mut envelope = Envelope::from_request(Some(event_id), request_meta()); - envelope.set_start_time(instant - Duration::from_secs(((count - i) * 3600) as u64)); - - let mut item = Item::new(ItemType::Attachment); - item.set_filename("item"); - envelope.add_item(item); - - envelope - }) + .map(|i| mock_envelope(instant - Duration::from_secs((count - i) as u64))) .collect() } - async fn setup() -> Pool { + async fn setup_db(run_migrations: bool) -> Pool { let path = std::env::temp_dir().join(Uuid::new_v4().to_string()); create_spool_directory(&path).await; @@ -303,7 +341,9 @@ mod tests { .await .unwrap(); - sqlx::migrate!("../migrations").run(&db).await.unwrap(); + if run_migrations { + sqlx::migrate!("../migrations").run(&db).await.unwrap(); + } db } @@ -323,9 +363,53 @@ mod tests { } } + #[tokio::test] + async fn test_push_when_db_is_not_valid() { + let db = setup_db(false).await; + let mut stack = SQLiteEnvelopeStack::new( + db, + 3, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + let envelopes = mock_envelopes(3); + + // We push the 3 envelopes without errors because they are below the threshold. + for envelope in envelopes.clone() { + assert!(stack.push(envelope).await.is_ok()); + } + + // We push 1 more envelope which results in spooling, which fails because of a database + // problem. + let envelope = mock_envelope(Instant::now()); + assert!(stack.push(envelope).await.is_err()); + + // Now one element should have been popped because the stack tried to spool it and the + // previous, insertion failed, so we have only 2 elements in the stack, we can now add a + // new one and we will succeed. + let envelope = mock_envelope(Instant::now()); + assert!(stack.push(envelope).await.is_ok()); + assert_eq!(stack.buffer.len(), 3); + } + + #[tokio::test] + async fn test_pop_when_db_is_not_valid() { + let db = setup_db(false).await; + let mut stack = SQLiteEnvelopeStack::new( + db, + 3, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + // We pop with no elements. + assert!(stack.pop().await.is_err()); + } + #[tokio::test] async fn test_push_below_threshold_and_pop() { - let db = setup().await; + let db = setup_db(true).await; let mut stack = SQLiteEnvelopeStack::new( db, 10, @@ -360,7 +444,7 @@ mod tests { #[tokio::test] async fn test_push_above_threshold_and_pop() { - let db = setup().await; + let db = setup_db(true).await; let mut stack = SQLiteEnvelopeStack::new( db, 10, @@ -401,6 +485,18 @@ mod tests { envelopes.clone()[4].event_id().unwrap() ); + // We insert a new envelope, to test the load from disk happening during `peek()` gives + // priority to this envelope in the stack. + let envelope = mock_envelope(Instant::now()); + assert!(stack.push(envelope.clone()).await.is_ok()); + + // We pop and expect the newly inserted element. + let popped_envelope = stack.pop().await.unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + envelope.event_id().unwrap() + ); + // We pop 5 envelopes, which should not result in a disk load since `peek()` already should // have caused it. for envelope in envelopes[0..5].iter().rev() { From 2b7615e0143c97d913c373a3ceea7a83d34c3130 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Jul 2024 14:06:33 +0200 Subject: [PATCH 05/23] Fix --- relay-server/src/services/spooler/envelope_stack/sqlite.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index b4e75a6ec7..d84997179f 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -128,6 +128,10 @@ impl SQLiteEnvelopeStack { "failed to spool envelopes to disk", ); + // When early return here, we are acknowledging that the elements that we popped from + // the buffer are lost. We are doing this on purposes, since if we were to have a + // database corruption during runtime, and we were to put the values back into the buffer + // we will end up with an infinite cycle. return Err(SQLiteEnvelopeStackError::DatabaseError(err)); } From 01626f4bd2eedcdcda1b78a89c46bc9a7a6b120a Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Jul 2024 14:41:50 +0200 Subject: [PATCH 06/23] Fix --- .../services/spooler/envelope_stack/sqlite.rs | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index d84997179f..60868b232a 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -291,7 +291,9 @@ fn received_at(envelope: &Envelope) -> i64 { mod tests { use crate::envelope::{Envelope, Item, ItemType}; use crate::extractors::RequestMeta; - use crate::services::spooler::envelope_stack::sqlite::SQLiteEnvelopeStack; + use crate::services::spooler::envelope_stack::sqlite::{ + SQLiteEnvelopeStack, SQLiteEnvelopeStackError, + }; use crate::services::spooler::envelope_stack::EnvelopeStack; use relay_base_schema::project::ProjectKey; use relay_event_schema::protocol::EventId; @@ -387,7 +389,10 @@ mod tests { // We push 1 more envelope which results in spooling, which fails because of a database // problem. let envelope = mock_envelope(Instant::now()); - assert!(stack.push(envelope).await.is_err()); + assert!(matches!( + stack.push(envelope).await, + Err(SQLiteEnvelopeStackError::DatabaseError(_)) + )); // Now one element should have been popped because the stack tried to spool it and the // previous, insertion failed, so we have only 2 elements in the stack, we can now add a @@ -407,8 +412,28 @@ mod tests { ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); + // We pop with an invalid db. + assert!(matches!( + stack.pop().await, + Err(SQLiteEnvelopeStackError::DatabaseError(_)) + )); + } + + #[tokio::test] + async fn test_pop_when_stack_is_empty() { + let db = setup_db(true).await; + let mut stack = SQLiteEnvelopeStack::new( + db, + 3, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + // We pop with no elements. - assert!(stack.pop().await.is_err()); + assert!(matches!( + stack.pop().await, + Err(SQLiteEnvelopeStackError::Empty) + )); } #[tokio::test] From 3e1eee099a97f50697146dfd5f886d1b628662b1 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Jul 2024 14:44:15 +0200 Subject: [PATCH 07/23] Add migration --- migrations/20240724144200_change_index.sql | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 migrations/20240724144200_change_index.sql diff --git a/migrations/20240724144200_change_index.sql b/migrations/20240724144200_change_index.sql new file mode 100644 index 0000000000..752e9bd859 --- /dev/null +++ b/migrations/20240724144200_change_index.sql @@ -0,0 +1,3 @@ +DROP INDEX IF EXISTS project_keys; + +CREATE INDEX IF NOT EXISTS project_keys_received_at ON envelopes (own_key, sampling_key, received_at); \ No newline at end of file From 1105c39d80d6ee372d92679be4c8d58e0a2b44a7 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Jul 2024 15:40:48 +0200 Subject: [PATCH 08/23] Fix --- .../services/spooler/envelope_stack/sqlite.rs | 77 ++++++++++++++++--- 1 file changed, 68 insertions(+), 9 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index 60868b232a..c472320f5b 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -7,7 +7,7 @@ use sqlx::query::Query; use sqlx::sqlite::{SqliteArguments, SqliteRow}; use sqlx::{Pool, QueryBuilder, Row, Sqlite}; use std::cmp::Ordering; -use std::collections::{BinaryHeap, VecDeque}; +use std::collections::VecDeque; use std::error::Error; use std::pin::pin; @@ -152,10 +152,10 @@ impl SQLiteEnvelopeStack { return Ok(()); } - // We use a priority map to order envelopes that are deleted from the database. + // We use a sorted vector to order envelopes that are deleted from the database. // Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't // return deleted rows in a specific order. - let mut ordered_envelopes = BinaryHeap::with_capacity(self.disk_batch_size()); + let mut ordered_envelopes = Vec::with_capacity(self.disk_batch_size()); while let Some(envelope) = envelopes.as_mut().next().await { let envelope = match envelope { Ok(envelope) => envelope, @@ -183,12 +183,11 @@ impl SQLiteEnvelopeStack { } } } - - for envelope in ordered_envelopes.into_sorted_vec() { - // We push in the back of the buffer, since we still want to give priority to - // incoming envelopes that have a more recent timestamp. - self.buffer.push_back(envelope.0) - } + ordered_envelopes.sort(); + let mut ordered_envelopes = ordered_envelopes.into_iter().map(|o| o.0).collect(); + // We push in the back of the buffer, since we still want to give priority to + // incoming envelopes that have a more recent timestamp. + self.buffer.append(&mut ordered_envelopes); Ok(()) } @@ -537,4 +536,64 @@ mod tests { } assert!(stack.buffer.is_empty()); } + + // #[tokio::test] + // async fn benchmark_insert_and_pop_100k_envelopes() { + // let db = setup_db(true).await; + // let mut stack = SQLiteEnvelopeStack::new( + // db, + // 1000, // Set a reasonable spool threshold + // ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + // ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + // ); + // + // let start_time = Instant::now(); + // + // // Insert 100,000 envelopes + // for _ in 0..100_000 { + // let envelope = mock_envelope(Instant::now()); + // stack.push(envelope).await.unwrap(); + // } + // + // let insert_duration = start_time.elapsed(); + // println!("Time to insert 100,000 envelopes: {:?}", insert_duration); + // + // let start_time = Instant::now(); + // + // // Pop 100,000 envelopes + // for _ in 0..100_000 { + // stack.pop().await.unwrap(); + // } + // + // let pop_duration = start_time.elapsed(); + // println!("Time to pop 100,000 envelopes: {:?}", pop_duration); + // + // println!("Total time: {:?}", insert_duration + pop_duration); + // } + // + // #[tokio::test] + // async fn benchmark_mixed_operations() { + // let db = setup_db(true).await; + // let mut stack = SQLiteEnvelopeStack::new( + // db, + // 1000, // Set a reasonable spool threshold + // ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + // ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + // ); + // + // let start_time = Instant::now(); + // + // for i in 0..100_000 { + // if i % 2 == 0 { + // let envelope = mock_envelope(Instant::now()); + // stack.push(envelope).await.unwrap(); + // } else if let Ok(envelope) = stack.pop().await { + // // Do something with the envelope to prevent optimization + // assert!(envelope.event_id().is_some()); + // } + // } + // + // let duration = start_time.elapsed(); + // println!("Time for 100,000 mixed operations: {:?}", duration); + // } } From 3bf47acd105b1f1d3294ed55deee9e376bf238cb Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Jul 2024 16:23:13 +0200 Subject: [PATCH 09/23] Fix --- .../services/spooler/envelope_stack/sqlite.rs | 124 ++++++++---------- 1 file changed, 55 insertions(+), 69 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index c472320f5b..50458b2cbd 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -11,6 +11,7 @@ use std::collections::VecDeque; use std::error::Error; use std::pin::pin; +/// New Type used to define ordering on a `Box` based on the `start_time` field. struct OrderedEnvelope(Box); impl Eq for OrderedEnvelope {} @@ -33,15 +34,22 @@ impl Ord for OrderedEnvelope { } } +/// An error returned when doing an operation on [`SQLiteEnvelopeStack`]. #[derive(Debug, thiserror::Error)] pub enum SQLiteEnvelopeStackError { + /// The stack is empty. #[error("the stack is empty")] Empty, + /// The database encountered an unexpected error. #[error("a database error occurred")] DatabaseError(#[from] sqlx::Error), } +/// An [`EnvelopeStack`] that is implemented on an SQLite database. +/// +/// For efficiency reasons, the implementation has an in-memory buffer that is periodically spooled +/// to disk in a batched way. pub struct SQLiteEnvelopeStack { db: Pool, spool_threshold: usize, @@ -51,6 +59,7 @@ pub struct SQLiteEnvelopeStack { } impl SQLiteEnvelopeStack { + /// Creates a new empty [`SQLiteEnvelopeStack`]. #[allow(dead_code)] pub fn new( db: Pool, @@ -67,6 +76,8 @@ impl SQLiteEnvelopeStack { } } + /// Create a new [`SQLiteEnvelopeStack`] and prefills it with data from disk, up to a fixed + /// number of elements. #[allow(dead_code)] pub async fn prepare( db: Pool, @@ -75,23 +86,32 @@ impl SQLiteEnvelopeStack { sampling_key: ProjectKey, ) -> Result<(), SQLiteEnvelopeStackError> { let mut stack = Self::new(db, spool_threshold, own_key, sampling_key); - stack.load_from_disk().await?; + stack.unspool_from_disk().await?; Ok(()) } + /// Threshold above which the [`SQLiteEnvelopeStack`] will spool data from the `buffer` to disk. fn above_spool_threshold(&self) -> bool { self.buffer.len() + 1 > self.spool_threshold } + /// Threshold below which the [`SQLiteEnvelopeStack`] will unspool data from disk to the + /// `buffer`. fn below_unspool_threshold(&self) -> bool { self.buffer.is_empty() } + /// The size of batches of data written and read from disk. fn disk_batch_size(&self) -> usize { self.spool_threshold / 2 } + /// Spools to disk up to `disk_batch_size` envelopes from the `buffer`. + /// + /// In case there is a failure while writing envelopes, all the envelopes that were enqueued + /// to be written to disk are lost. The explanation for this behavior can be found in the body + /// of the method. async fn spool_to_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> { if self.disk_batch_size() == 0 { return Ok(()); @@ -138,7 +158,14 @@ impl SQLiteEnvelopeStack { Ok(()) } - async fn load_from_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> { + /// Unspools from disk up to `disk_batch_size` envelopes and appends them to the `buffer`. + /// + /// In case a single deletion fails, the affected envelope will not be unspooled and unspooling + /// will continue with the remaining envelopes. + /// + /// In case an envelope fails deserialization due to malformed data in the database, the affected + /// envelope will not be unspooled and unspooling will continue with the remaining envelopes. + async fn unspool_from_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> { let envelopes = build_delete_and_fetch_many_envelopes( self.own_key, self.sampling_key, @@ -156,6 +183,7 @@ impl SQLiteEnvelopeStack { // Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't // return deleted rows in a specific order. let mut ordered_envelopes = Vec::with_capacity(self.disk_batch_size()); + let mut db_error = None; while let Some(envelope) = envelopes.as_mut().next().await { let envelope = match envelope { Ok(envelope) => envelope, @@ -164,10 +192,9 @@ impl SQLiteEnvelopeStack { error = &err as &dyn Error, "failed to unspool the envelopes from the disk", ); + db_error = Some(err); - // We early return under the assumption that the stream, if it contains an - // error, it means that the query failed. - return Err(SQLiteEnvelopeStackError::DatabaseError(err)); + continue; } }; @@ -183,15 +210,26 @@ impl SQLiteEnvelopeStack { } } } + // If there was a database error and no envelopes have been returned, we assume that we are + // in a critical state, so we return an error. + if let Some(db_error) = db_error { + if ordered_envelopes.is_empty() { + return Err(SQLiteEnvelopeStackError::DatabaseError(db_error)); + } + }; + + // We sort envelopes by `received_at`. ordered_envelopes.sort(); - let mut ordered_envelopes = ordered_envelopes.into_iter().map(|o| o.0).collect(); + // We push in the back of the buffer, since we still want to give priority to // incoming envelopes that have a more recent timestamp. - self.buffer.append(&mut ordered_envelopes); + self.buffer + .append(&mut ordered_envelopes.into_iter().map(|o| o.0).collect()); Ok(()) } + /// Deserializes an [`Envelope`] from a database row. fn extract_envelope(&self, row: SqliteRow) -> Result, SQLiteEnvelopeStackError> { let envelope_row: Vec = row .try_get("envelope") @@ -216,6 +254,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { if self.above_spool_threshold() { + // TODO: investigate how to do spooling/unspooling on a background thread. self.spool_to_disk().await?; } @@ -226,7 +265,8 @@ impl EnvelopeStack for SQLiteEnvelopeStack { async fn peek(&mut self) -> Result<&Box, Self::Error> { if self.below_unspool_threshold() { - self.load_from_disk().await? + // TODO: investigate how to do spooling/unspooling on a background thread. + self.unspool_from_disk().await? } self.buffer.front().ok_or(Self::Error::Empty) @@ -234,7 +274,8 @@ impl EnvelopeStack for SQLiteEnvelopeStack { async fn pop(&mut self) -> Result, Self::Error> { if self.below_unspool_threshold() { - self.load_from_disk().await? + // TODO: investigate how to do spooling/unspooling on a background thread. + self.unspool_from_disk().await? } self.buffer.pop_front().ok_or(Self::Error::Empty) @@ -248,6 +289,7 @@ struct InsertEnvelope { encoded_envelope: Vec, } +/// Builds a query that inserts many [`Envelope`](s) in the database. fn build_insert_many_envelopes<'a>( envelopes: impl Iterator, ) -> QueryBuilder<'a, Sqlite> { @@ -264,6 +306,7 @@ fn build_insert_many_envelopes<'a>( builder } +/// Builds a query that deletes many [`Envelope`] from the database. pub fn build_delete_and_fetch_many_envelopes<'a>( own_key: ProjectKey, project_key: ProjectKey, @@ -282,6 +325,9 @@ pub fn build_delete_and_fetch_many_envelopes<'a>( .bind(batch_size) } +/// Computes the `received_at` timestamps of an [`Envelope`] based on the `start_time` header. +/// +/// This method has been copied from the `ManagedEnvelope.received_at()` method. fn received_at(envelope: &Envelope) -> i64 { relay_common::time::instant_to_date_time(envelope.meta().start_time()).timestamp_millis() } @@ -536,64 +582,4 @@ mod tests { } assert!(stack.buffer.is_empty()); } - - // #[tokio::test] - // async fn benchmark_insert_and_pop_100k_envelopes() { - // let db = setup_db(true).await; - // let mut stack = SQLiteEnvelopeStack::new( - // db, - // 1000, // Set a reasonable spool threshold - // ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - // ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - // ); - // - // let start_time = Instant::now(); - // - // // Insert 100,000 envelopes - // for _ in 0..100_000 { - // let envelope = mock_envelope(Instant::now()); - // stack.push(envelope).await.unwrap(); - // } - // - // let insert_duration = start_time.elapsed(); - // println!("Time to insert 100,000 envelopes: {:?}", insert_duration); - // - // let start_time = Instant::now(); - // - // // Pop 100,000 envelopes - // for _ in 0..100_000 { - // stack.pop().await.unwrap(); - // } - // - // let pop_duration = start_time.elapsed(); - // println!("Time to pop 100,000 envelopes: {:?}", pop_duration); - // - // println!("Total time: {:?}", insert_duration + pop_duration); - // } - // - // #[tokio::test] - // async fn benchmark_mixed_operations() { - // let db = setup_db(true).await; - // let mut stack = SQLiteEnvelopeStack::new( - // db, - // 1000, // Set a reasonable spool threshold - // ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - // ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - // ); - // - // let start_time = Instant::now(); - // - // for i in 0..100_000 { - // if i % 2 == 0 { - // let envelope = mock_envelope(Instant::now()); - // stack.push(envelope).await.unwrap(); - // } else if let Ok(envelope) = stack.pop().await { - // // Do something with the envelope to prevent optimization - // assert!(envelope.event_id().is_some()); - // } - // } - // - // let duration = start_time.elapsed(); - // println!("Time for 100,000 mixed operations: {:?}", duration); - // } } From 8e5d77ed14ea56be6483796ef3bc96cc27c8668c Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Jul 2024 16:24:57 +0200 Subject: [PATCH 10/23] Fix --- relay-server/src/services/spooler/envelope_stack/sqlite.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index 50458b2cbd..6e3152f0d2 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -289,7 +289,7 @@ struct InsertEnvelope { encoded_envelope: Vec, } -/// Builds a query that inserts many [`Envelope`](s) in the database. +/// Builds a query that inserts many [`Envelope`]s in the database. fn build_insert_many_envelopes<'a>( envelopes: impl Iterator, ) -> QueryBuilder<'a, Sqlite> { From 47e282843ea0e0e2268b95974d358b13fcd4fa69 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 24 Jul 2024 16:26:49 +0200 Subject: [PATCH 11/23] Fix --- relay-server/src/services/spooler/envelope_stack/sqlite.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index 6e3152f0d2..7278fdb2cf 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -282,6 +282,8 @@ impl EnvelopeStack for SQLiteEnvelopeStack { } } +/// Struct which contains all the rows that have to be inserted in the database when storing an +/// [`Envelope`]. struct InsertEnvelope { received_at: i64, own_key: ProjectKey, From 8cd1b6f4e5141850035ae2829ebb5758d54d32ac Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 25 Jul 2024 10:33:25 +0200 Subject: [PATCH 12/23] Fix --- migrations/20240724144200_change_index.sql | 2 +- .../services/spooler/envelope_stack/sqlite.rs | 158 ++++++++++-------- 2 files changed, 87 insertions(+), 73 deletions(-) diff --git a/migrations/20240724144200_change_index.sql b/migrations/20240724144200_change_index.sql index 752e9bd859..75f702f32a 100644 --- a/migrations/20240724144200_change_index.sql +++ b/migrations/20240724144200_change_index.sql @@ -1,3 +1,3 @@ DROP INDEX IF EXISTS project_keys; -CREATE INDEX IF NOT EXISTS project_keys_received_at ON envelopes (own_key, sampling_key, received_at); \ No newline at end of file +CREATE INDEX IF NOT EXISTS project_keys_received_at ON envelopes (own_key, sampling_key, received_at); diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index 7278fdb2cf..be767972b4 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -6,34 +6,12 @@ use relay_base_schema::project::ProjectKey; use sqlx::query::Query; use sqlx::sqlite::{SqliteArguments, SqliteRow}; use sqlx::{Pool, QueryBuilder, Row, Sqlite}; -use std::cmp::Ordering; use std::collections::VecDeque; use std::error::Error; +use std::fmt::Debug; +use std::num::NonZeroUsize; use std::pin::pin; -/// New Type used to define ordering on a `Box` based on the `start_time` field. -struct OrderedEnvelope(Box); - -impl Eq for OrderedEnvelope {} - -impl PartialEq for OrderedEnvelope { - fn eq(&self, other: &Self) -> bool { - matches!(self.cmp(other), Ordering::Equal) - } -} - -impl PartialOrd for OrderedEnvelope { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for OrderedEnvelope { - fn cmp(&self, other: &Self) -> Ordering { - received_at(&other.0).cmp(&received_at(&self.0)) - } -} - /// An error returned when doing an operation on [`SQLiteEnvelopeStack`]. #[derive(Debug, thiserror::Error)] pub enum SQLiteEnvelopeStackError { @@ -52,10 +30,13 @@ pub enum SQLiteEnvelopeStackError { /// to disk in a batched way. pub struct SQLiteEnvelopeStack { db: Pool, - spool_threshold: usize, + spool_threshold: NonZeroUsize, + disk_batch_size: NonZeroUsize, own_key: ProjectKey, sampling_key: ProjectKey, - buffer: VecDeque>, + #[allow(clippy::vec_box)] + batches_buffer: VecDeque>>, + batches_buffer_size: usize, } impl SQLiteEnvelopeStack { @@ -69,10 +50,14 @@ impl SQLiteEnvelopeStack { ) -> Self { Self { db, - spool_threshold, + spool_threshold: NonZeroUsize::new(spool_threshold) + .expect("the spool threshold must be > 0"), + disk_batch_size: NonZeroUsize::new(spool_threshold.div_ceil(2)) + .expect("the disk batch size must be > 0"), own_key, sampling_key, - buffer: VecDeque::with_capacity(spool_threshold), + batches_buffer: VecDeque::with_capacity(spool_threshold), + batches_buffer_size: 0, } } @@ -93,18 +78,13 @@ impl SQLiteEnvelopeStack { /// Threshold above which the [`SQLiteEnvelopeStack`] will spool data from the `buffer` to disk. fn above_spool_threshold(&self) -> bool { - self.buffer.len() + 1 > self.spool_threshold + self.batches_buffer_size >= self.spool_threshold.get() } /// Threshold below which the [`SQLiteEnvelopeStack`] will unspool data from disk to the /// `buffer`. fn below_unspool_threshold(&self) -> bool { - self.buffer.is_empty() - } - - /// The size of batches of data written and read from disk. - fn disk_batch_size(&self) -> usize { - self.spool_threshold / 2 + self.batches_buffer_size == 0 } /// Spools to disk up to `disk_batch_size` envelopes from the `buffer`. @@ -113,23 +93,10 @@ impl SQLiteEnvelopeStack { /// to be written to disk are lost. The explanation for this behavior can be found in the body /// of the method. async fn spool_to_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> { - if self.disk_batch_size() == 0 { - return Ok(()); - } - - // TODO: we can make a custom iterator to consume back elements until threshold to avoid - // allocating a vector. - let mut envelopes = Vec::with_capacity(self.disk_batch_size()); - for _ in 0..self.disk_batch_size() { - let Some(value) = self.buffer.pop_back() else { - break; - }; - - envelopes.push(value); - } - if envelopes.is_empty() { + let Some(envelopes) = self.batches_buffer.pop_back() else { return Ok(()); - } + }; + self.batches_buffer_size -= envelopes.len(); let insert_envelopes = envelopes.iter().map(|e| InsertEnvelope { received_at: received_at(e), @@ -169,7 +136,7 @@ impl SQLiteEnvelopeStack { let envelopes = build_delete_and_fetch_many_envelopes( self.own_key, self.sampling_key, - self.disk_batch_size() as i64, + self.disk_batch_size.get() as i64, ) .fetch(&self.db) .peekable(); @@ -182,7 +149,7 @@ impl SQLiteEnvelopeStack { // We use a sorted vector to order envelopes that are deleted from the database. // Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't // return deleted rows in a specific order. - let mut ordered_envelopes = Vec::with_capacity(self.disk_batch_size()); + let mut extracted_envelopes = Vec::with_capacity(self.disk_batch_size.get()); let mut db_error = None; while let Some(envelope) = envelopes.as_mut().next().await { let envelope = match envelope { @@ -200,7 +167,7 @@ impl SQLiteEnvelopeStack { match self.extract_envelope(envelope) { Ok(envelope) => { - ordered_envelopes.push(OrderedEnvelope(envelope)); + extracted_envelopes.push(envelope); } Err(err) => { relay_log::error!( @@ -212,19 +179,21 @@ impl SQLiteEnvelopeStack { } // If there was a database error and no envelopes have been returned, we assume that we are // in a critical state, so we return an error. - if let Some(db_error) = db_error { - if ordered_envelopes.is_empty() { + if extracted_envelopes.is_empty() { + if let Some(db_error) = db_error { return Err(SQLiteEnvelopeStackError::DatabaseError(db_error)); } - }; + + return Ok(()); + } // We sort envelopes by `received_at`. - ordered_envelopes.sort(); + extracted_envelopes.sort_by_key(|a| received_at(a)); // We push in the back of the buffer, since we still want to give priority to // incoming envelopes that have a more recent timestamp. - self.buffer - .append(&mut ordered_envelopes.into_iter().map(|o| o.0).collect()); + self.batches_buffer_size += extracted_envelopes.len(); + self.batches_buffer.push_back(extracted_envelopes); Ok(()) } @@ -258,7 +227,19 @@ impl EnvelopeStack for SQLiteEnvelopeStack { self.spool_to_disk().await?; } - self.buffer.push_front(envelope); + // We need to check if the topmost batch has space, if not we have to create a new batch and + // push it in front. + if self.batches_buffer.front().map_or(true, |last_batch| { + last_batch.len() >= self.disk_batch_size.get() + }) { + self.batches_buffer + .push_front(Vec::with_capacity(self.disk_batch_size.get())); + } + + if let Some(batch) = self.batches_buffer.front_mut() { + batch.push(envelope); + self.batches_buffer_size += 1; + } Ok(()) } @@ -269,7 +250,10 @@ impl EnvelopeStack for SQLiteEnvelopeStack { self.unspool_from_disk().await? } - self.buffer.front().ok_or(Self::Error::Empty) + self.batches_buffer + .front() + .and_then(|last_batch| last_batch.last()) + .ok_or(Self::Error::Empty) } async fn pop(&mut self) -> Result, Self::Error> { @@ -278,7 +262,25 @@ impl EnvelopeStack for SQLiteEnvelopeStack { self.unspool_from_disk().await? } - self.buffer.pop_front().ok_or(Self::Error::Empty) + let result = self + .batches_buffer + .front_mut() + .and_then(|last_batch| { + self.batches_buffer_size -= 1; + last_batch.pop() + }) + .ok_or(Self::Error::Empty); + + // Since we might leave a batch without elements, we want to pop it from the buffer. + if self + .batches_buffer + .front() + .map_or(false, |last_batch| last_batch.is_empty()) + { + self.batches_buffer.pop_front(); + } + + result } } @@ -441,12 +443,24 @@ mod tests { Err(SQLiteEnvelopeStackError::DatabaseError(_)) )); - // Now one element should have been popped because the stack tried to spool it and the - // previous, insertion failed, so we have only 2 elements in the stack, we can now add a - // new one and we will succeed. + // The stack now contains the last of the 3 elements that were added. If we add a new one + // we will end up with 2. let envelope = mock_envelope(Instant::now()); - assert!(stack.push(envelope).await.is_ok()); - assert_eq!(stack.buffer.len(), 3); + assert!(stack.push(envelope.clone()).await.is_ok()); + assert_eq!(stack.batches_buffer_size, 2); + + // We pop the remaining elements, expecting the last added envelope to be on top. + let popped_envelope_1 = stack.pop().await.unwrap(); + let popped_envelope_2 = stack.pop().await.unwrap(); + assert_eq!( + popped_envelope_1.event_id().unwrap(), + envelope.event_id().unwrap() + ); + assert_eq!( + popped_envelope_2.event_id().unwrap(), + envelopes.clone()[2].event_id().unwrap() + ); + assert_eq!(stack.batches_buffer_size, 0); } #[tokio::test] @@ -499,7 +513,7 @@ mod tests { for envelope in envelopes.clone() { assert!(stack.push(envelope).await.is_ok()); } - assert_eq!(stack.buffer.len(), 5); + assert_eq!(stack.batches_buffer_size, 5); // We peek the top element. let peeked_envelope = stack.peek().await.unwrap(); @@ -534,7 +548,7 @@ mod tests { for envelope in envelopes.clone() { assert!(stack.push(envelope).await.is_ok()); } - assert_eq!(stack.buffer.len(), 10); + assert_eq!(stack.batches_buffer_size, 10); // We peek the top element. let peeked_envelope = stack.peek().await.unwrap(); @@ -552,7 +566,7 @@ mod tests { envelope.event_id().unwrap() ); } - assert!(stack.buffer.is_empty()); + assert_eq!(stack.batches_buffer_size, 0); // We peek the top element, which since the buffer is empty should result in a disk load. let peeked_envelope = stack.peek().await.unwrap(); @@ -582,6 +596,6 @@ mod tests { envelope.event_id().unwrap() ); } - assert!(stack.buffer.is_empty()); + assert_eq!(stack.batches_buffer_size, 0); } } From 1058237123bcfa87dfafdf874d51f83b4fdcaef9 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 25 Jul 2024 10:44:22 +0200 Subject: [PATCH 13/23] Fix --- .../services/spooler/envelope_stack/sqlite.rs | 56 ++++++++++++------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index be767972b4..7eee384e20 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -31,7 +31,7 @@ pub enum SQLiteEnvelopeStackError { pub struct SQLiteEnvelopeStack { db: Pool, spool_threshold: NonZeroUsize, - disk_batch_size: NonZeroUsize, + batch_size: NonZeroUsize, own_key: ProjectKey, sampling_key: ProjectKey, #[allow(clippy::vec_box)] @@ -44,19 +44,20 @@ impl SQLiteEnvelopeStack { #[allow(dead_code)] pub fn new( db: Pool, - spool_threshold: usize, + disk_batch_size: usize, + max_batches: usize, own_key: ProjectKey, sampling_key: ProjectKey, ) -> Self { Self { db, - spool_threshold: NonZeroUsize::new(spool_threshold) + spool_threshold: NonZeroUsize::new(disk_batch_size * max_batches) .expect("the spool threshold must be > 0"), - disk_batch_size: NonZeroUsize::new(spool_threshold.div_ceil(2)) + batch_size: NonZeroUsize::new(disk_batch_size) .expect("the disk batch size must be > 0"), own_key, sampling_key, - batches_buffer: VecDeque::with_capacity(spool_threshold), + batches_buffer: VecDeque::with_capacity(max_batches), batches_buffer_size: 0, } } @@ -66,11 +67,12 @@ impl SQLiteEnvelopeStack { #[allow(dead_code)] pub async fn prepare( db: Pool, - spool_threshold: usize, + disk_batch_size: usize, + max_batches: usize, own_key: ProjectKey, sampling_key: ProjectKey, ) -> Result<(), SQLiteEnvelopeStackError> { - let mut stack = Self::new(db, spool_threshold, own_key, sampling_key); + let mut stack = Self::new(db, disk_batch_size, max_batches, own_key, sampling_key); stack.unspool_from_disk().await?; Ok(()) @@ -136,7 +138,7 @@ impl SQLiteEnvelopeStack { let envelopes = build_delete_and_fetch_many_envelopes( self.own_key, self.sampling_key, - self.disk_batch_size.get() as i64, + self.batch_size.get() as i64, ) .fetch(&self.db) .peekable(); @@ -149,7 +151,7 @@ impl SQLiteEnvelopeStack { // We use a sorted vector to order envelopes that are deleted from the database. // Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't // return deleted rows in a specific order. - let mut extracted_envelopes = Vec::with_capacity(self.disk_batch_size.get()); + let mut extracted_envelopes = Vec::with_capacity(self.batch_size.get()); let mut db_error = None; while let Some(envelope) = envelopes.as_mut().next().await { let envelope = match envelope { @@ -229,11 +231,13 @@ impl EnvelopeStack for SQLiteEnvelopeStack { // We need to check if the topmost batch has space, if not we have to create a new batch and // push it in front. - if self.batches_buffer.front().map_or(true, |last_batch| { - last_batch.len() >= self.disk_batch_size.get() - }) { + if self + .batches_buffer + .front() + .map_or(true, |last_batch| last_batch.len() >= self.batch_size.get()) + { self.batches_buffer - .push_front(Vec::with_capacity(self.disk_batch_size.get())); + .push_front(Vec::with_capacity(self.batch_size.get())); } if let Some(batch) = self.batches_buffer.front_mut() { @@ -423,14 +427,15 @@ mod tests { let db = setup_db(false).await; let mut stack = SQLiteEnvelopeStack::new( db, - 3, + 2, + 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); - let envelopes = mock_envelopes(3); + let envelopes = mock_envelopes(4); - // We push the 3 envelopes without errors because they are below the threshold. + // We push the 4 envelopes without errors because they are below the threshold. for envelope in envelopes.clone() { assert!(stack.push(envelope).await.is_ok()); } @@ -447,17 +452,22 @@ mod tests { // we will end up with 2. let envelope = mock_envelope(Instant::now()); assert!(stack.push(envelope.clone()).await.is_ok()); - assert_eq!(stack.batches_buffer_size, 2); + assert_eq!(stack.batches_buffer_size, 3); // We pop the remaining elements, expecting the last added envelope to be on top. let popped_envelope_1 = stack.pop().await.unwrap(); let popped_envelope_2 = stack.pop().await.unwrap(); + let popped_envelope_3 = stack.pop().await.unwrap(); assert_eq!( popped_envelope_1.event_id().unwrap(), envelope.event_id().unwrap() ); assert_eq!( popped_envelope_2.event_id().unwrap(), + envelopes.clone()[3].event_id().unwrap() + ); + assert_eq!( + popped_envelope_3.event_id().unwrap(), envelopes.clone()[2].event_id().unwrap() ); assert_eq!(stack.batches_buffer_size, 0); @@ -468,7 +478,8 @@ mod tests { let db = setup_db(false).await; let mut stack = SQLiteEnvelopeStack::new( db, - 3, + 2, + 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -485,7 +496,8 @@ mod tests { let db = setup_db(true).await; let mut stack = SQLiteEnvelopeStack::new( db, - 3, + 2, + 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -502,7 +514,8 @@ mod tests { let db = setup_db(true).await; let mut stack = SQLiteEnvelopeStack::new( db, - 10, + 5, + 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); @@ -537,7 +550,8 @@ mod tests { let db = setup_db(true).await; let mut stack = SQLiteEnvelopeStack::new( db, - 10, + 5, + 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), ); From 9d839ee29702a2698e86d330d35664feb5c2e3c6 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 25 Jul 2024 10:49:34 +0200 Subject: [PATCH 14/23] Fix --- .../services/spooler/envelope_stack/sqlite.rs | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index 7eee384e20..78b2ded083 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -37,6 +37,7 @@ pub struct SQLiteEnvelopeStack { #[allow(clippy::vec_box)] batches_buffer: VecDeque>>, batches_buffer_size: usize, + check_disk: bool, } impl SQLiteEnvelopeStack { @@ -59,6 +60,7 @@ impl SQLiteEnvelopeStack { sampling_key, batches_buffer: VecDeque::with_capacity(max_batches), batches_buffer_size: 0, + check_disk: true, } } @@ -124,6 +126,9 @@ impl SQLiteEnvelopeStack { return Err(SQLiteEnvelopeStackError::DatabaseError(err)); } + // If we successfully spooled to disk, we know that data should be there. + self.check_disk = true; + Ok(()) } @@ -179,13 +184,18 @@ impl SQLiteEnvelopeStack { } } } - // If there was a database error and no envelopes have been returned, we assume that we are - // in a critical state, so we return an error. + if extracted_envelopes.is_empty() { + // If there was a database error and no envelopes have been returned, we assume that we are + // in a critical state, so we return an error. if let Some(db_error) = db_error { return Err(SQLiteEnvelopeStackError::DatabaseError(db_error)); } + // In case no envelopes were unspool, we will mark the disk as empty until another round + // of spooling takes place. + self.check_disk = false; + return Ok(()); } @@ -224,7 +234,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { type Error = SQLiteEnvelopeStackError; async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { - if self.above_spool_threshold() { + if self.above_spool_threshold() && self.check_disk { // TODO: investigate how to do spooling/unspooling on a background thread. self.spool_to_disk().await?; } @@ -249,7 +259,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { } async fn peek(&mut self) -> Result<&Box, Self::Error> { - if self.below_unspool_threshold() { + if self.below_unspool_threshold() && self.check_disk { // TODO: investigate how to do spooling/unspooling on a background thread. self.unspool_from_disk().await? } @@ -261,7 +271,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { } async fn pop(&mut self) -> Result, Self::Error> { - if self.below_unspool_threshold() { + if self.below_unspool_threshold() && self.check_disk { // TODO: investigate how to do spooling/unspooling on a background thread. self.unspool_from_disk().await? } From 32d0d56444317c72d6ad8c88cd806da28bfd6057 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 25 Jul 2024 11:13:21 +0200 Subject: [PATCH 15/23] Fix --- .../services/spooler/envelope_stack/sqlite.rs | 71 +++++++++++++++++-- 1 file changed, 66 insertions(+), 5 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index 78b2ded083..ac552bfc2e 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -1,6 +1,7 @@ use crate::envelope::Envelope; use crate::extractors::StartTime; use crate::services::spooler::envelope_stack::EnvelopeStack; +use crate::utils::get_sampling_key; use futures::StreamExt; use relay_base_schema::project::ProjectKey; use sqlx::query::Query; @@ -22,6 +23,10 @@ pub enum SQLiteEnvelopeStackError { /// The database encountered an unexpected error. #[error("a database error occurred")] DatabaseError(#[from] sqlx::Error), + + /// The envelope has the project keys that are not matching the ones of the stack. + #[error("the envelope doesn't have matching project keys with the stack")] + MismatchedEnvelope, } /// An [`EnvelopeStack`] that is implemented on an SQLite database. @@ -29,14 +34,24 @@ pub enum SQLiteEnvelopeStackError { /// For efficiency reasons, the implementation has an in-memory buffer that is periodically spooled /// to disk in a batched way. pub struct SQLiteEnvelopeStack { + /// Shared SQLite database pool which will be used to read and write from disk. db: Pool, + /// Threshold defining the maximum number of envelopes in the `batches_buffer` before spooling + /// to disk will take place. spool_threshold: NonZeroUsize, + /// Size of a batch of envelopes that is written to disk. batch_size: NonZeroUsize, + /// The project key of the project to which all the envelopes belong. own_key: ProjectKey, + /// The project key of the root project of the trace to which all the envelopes belong. sampling_key: ProjectKey, + /// In-memory stack containing all the batches of envelopes that are read and written to disk. #[allow(clippy::vec_box)] batches_buffer: VecDeque>>, + /// The total number of envelopes inside the `batches_buffer`. batches_buffer_size: usize, + /// Boolean representing whether calls to `push()` and `peek()` check disk in case not enough + /// elements are available in the `batches_buffer`. check_disk: bool, } @@ -228,13 +243,26 @@ impl SQLiteEnvelopeStack { Ok(envelope) } + + /// Validates that the incoming [`Envelope`] has the same project keys at the + /// [`SQLiteEnvelopeStack`]. + fn validate_envelope(&self, envelope: &Envelope) -> bool { + let own_key = envelope.meta().public_key(); + let sampling_key = get_sampling_key(envelope).unwrap_or(own_key); + + self.own_key == own_key && self.sampling_key == sampling_key + } } impl EnvelopeStack for SQLiteEnvelopeStack { type Error = SQLiteEnvelopeStackError; async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { - if self.above_spool_threshold() && self.check_disk { + if !self.validate_envelope(&envelope) { + return Err(Self::Error::MismatchedEnvelope); + } + + if self.above_spool_threshold() { // TODO: investigate how to do spooling/unspooling on a background thread. self.spool_to_disk().await?; } @@ -360,15 +388,17 @@ mod tests { use crate::services::spooler::envelope_stack::EnvelopeStack; use relay_base_schema::project::ProjectKey; use relay_event_schema::protocol::EventId; + use relay_sampling::DynamicSamplingContext; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; use sqlx::{Pool, Sqlite}; + use std::collections::BTreeMap; use std::path::Path; use std::time::{Duration, Instant}; use tokio::fs::DirBuilder; use uuid::Uuid; fn request_meta() -> RequestMeta { - let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" + let dsn = "https://a94ae32be2584e0bbd7a4cbb95971fee:@sentry.io/42" .parse() .unwrap(); @@ -378,11 +408,24 @@ mod tests { fn mock_envelope(instant: Instant) -> Box { let event_id = EventId::new(); let mut envelope = Envelope::from_request(Some(event_id), request_meta()); + + let dsc = DynamicSamplingContext { + trace_id: Uuid::new_v4(), + public_key: ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + release: Some("1.1.1".to_string()), + user: Default::default(), + replay_id: None, + environment: None, + transaction: Some("transaction1".into()), + sample_rate: None, + sampled: Some(true), + other: BTreeMap::new(), + }; + + envelope.set_dsc(dsc); envelope.set_start_time(instant); - let mut item = Item::new(ItemType::Attachment); - item.set_filename("item"); - envelope.add_item(item); + envelope.add_item(Item::new(ItemType::Transaction)); envelope } @@ -432,6 +475,24 @@ mod tests { } } + #[tokio::test] + async fn test_push_with_mismatching_project_keys() { + let db = setup_db(false).await; + let mut stack = SQLiteEnvelopeStack::new( + db, + 2, + 2, + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + ); + + let envelope = mock_envelope(Instant::now()); + assert!(matches!( + stack.push(envelope).await, + Err(SQLiteEnvelopeStackError::MismatchedEnvelope) + )); + } + #[tokio::test] async fn test_push_when_db_is_not_valid() { let db = setup_db(false).await; From e125ae3dec0ad6e3299c5e71ac1286e4eea08f49 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 25 Jul 2024 15:30:25 +0200 Subject: [PATCH 16/23] Fix --- Cargo.lock | 1 + relay-server/Cargo.toml | 5 + relay-server/benches/benches.rs | 171 ++++++++++++++++++ relay-server/src/envelope.rs | 1 + relay-server/src/lib.rs | 4 + .../services/spooler/envelope_stack/mod.rs | 19 +- .../services/spooler/envelope_stack/sqlite.rs | 4 +- relay-server/src/services/spooler/mod.rs | 2 +- 8 files changed, 199 insertions(+), 8 deletions(-) create mode 100644 relay-server/benches/benches.rs diff --git a/Cargo.lock b/Cargo.lock index 205363ea1c..4b4e7682dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4052,6 +4052,7 @@ dependencies = [ "brotli", "bytes", "chrono", + "criterion", "data-encoding", "flate2", "fnv", diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index f32ba85b94..b87d958895 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -137,6 +137,7 @@ axum-extra = { workspace = true, features = ["protobuf"] } semver = { workspace = true } [dev-dependencies] +criterion = { workspace = true } tokio = { workspace = true, features = ['test-util'] } insta = { workspace = true } relay-event-schema = { workspace = true, features = ["jsonschema"] } @@ -144,3 +145,7 @@ relay-protocol = { workspace = true, features = ["test"] } relay-test = { workspace = true } similar-asserts = { workspace = true } tempfile = { workspace = true } + +[[bench]] +name = "benches" +harness = false diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs new file mode 100644 index 0000000000..7ef865bee9 --- /dev/null +++ b/relay-server/benches/benches.rs @@ -0,0 +1,171 @@ +use bytes::Bytes; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; +use sqlx::{Pool, Sqlite}; +use std::path::PathBuf; +use std::time::Duration; +use tempfile::TempDir; +use tokio::runtime::Runtime; + +use relay_base_schema::project::ProjectKey; +use relay_server::{Envelope, EnvelopeStack, SQLiteEnvelopeStack}; + +fn setup_db(path: &PathBuf) -> Pool { + let options = SqliteConnectOptions::new() + .filename(path) + .journal_mode(SqliteJournalMode::Wal) + .create_if_missing(true); + + let runtime = Runtime::new().unwrap(); + runtime.block_on(async { + let db = SqlitePoolOptions::new() + .connect_with(options) + .await + .unwrap(); + + sqlx::migrate!("../migrations").run(&db).await.unwrap(); + + db + }) +} + +async fn reset_db(db: Pool) { + sqlx::query("DELETE FROM envelopes") + .execute(&db) + .await + .unwrap(); +} + +fn mock_envelope() -> Box { + let bytes = Bytes::from( + "\ + {\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\",\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}\n\ + {\"type\":\"attachment\"}\n\ + helloworld\n\ + ", + ); + + Envelope::parse_bytes(bytes).unwrap() +} + +fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { + let temp_dir = TempDir::new().unwrap(); + let db_path = temp_dir.path().join("test.db"); + let db = setup_db(&db_path); + + let runtime = Runtime::new().unwrap(); + + let mut group = c.benchmark_group("sqlite_envelope_stack"); + group.measurement_time(Duration::from_secs(60)); + + let disk_batch_size = 10; + for size in [1_000, 10_000, 100_000].iter() { + group.throughput(Throughput::Elements(*size as u64)); + + // Benchmark push operations + group.bench_with_input(BenchmarkId::new("push", size), size, |b, &size| { + b.iter_with_setup( + || { + runtime.block_on(async { + reset_db(db.clone()).await; + }); + + let stack = SQLiteEnvelopeStack::new( + db.clone(), + disk_batch_size, + 2, + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + ); + + let mut envelopes = Vec::with_capacity(size); + for _ in 0..size { + envelopes.push(mock_envelope()); + } + + (stack, envelopes) + }, + |(mut stack, envelopes)| { + runtime.block_on(async { + for envelope in envelopes { + stack.push(envelope).await.unwrap(); + } + }); + }, + ); + }); + + // Benchmark pop operations + group.bench_with_input(BenchmarkId::new("pop", size), size, |b, &size| { + b.iter_with_setup( + || { + runtime.block_on(async { + reset_db(db.clone()).await; + + let mut stack = SQLiteEnvelopeStack::new( + db.clone(), + disk_batch_size, + 2, + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + ); + + // Pre-fill the stack + for _ in 0..size { + let envelope = mock_envelope(); + stack.push(envelope).await.unwrap(); + } + + stack + }) + }, + |mut stack| { + runtime.block_on(async { + // Benchmark popping + for _ in 0..size { + stack.pop().await.unwrap(); + } + }); + }, + ); + }); + + // Benchmark mixed push and pop operations + group.bench_with_input(BenchmarkId::new("mixed", size), size, |b, &size| { + b.iter_with_setup( + || { + runtime.block_on(async { + reset_db(db.clone()).await; + }); + + SQLiteEnvelopeStack::new( + db.clone(), + disk_batch_size, + 2, + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), + ) + }, + |mut stack| { + runtime.block_on(async { + for _ in 0..size { + if rand::random::() { + let envelope = mock_envelope(); + stack.push(envelope).await.unwrap(); + } else if stack.pop().await.is_err() { + // If pop fails (empty stack), push instead + let envelope = mock_envelope(); + stack.push(envelope).await.unwrap(); + } + } + }); + }, + ); + }); + } + + group.finish(); +} + +criterion_group!(benches, benchmark_sqlite_envelope_stack); +criterion_main!(benches); diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 5fc0c6a6d6..c15c17e181 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -1085,6 +1085,7 @@ impl EnvelopeHeaders { } } +#[doc(hidden)] #[derive(Clone, Debug)] pub struct Envelope { headers: EnvelopeHeaders, diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index f6370a637d..4b1c1670ae 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -267,6 +267,10 @@ mod statsd; mod utils; pub use self::services::spooler::spool_utils; +// Public just for benchmarks. +pub use self::envelope::Envelope; +pub use self::services::spooler::envelope_stack::sqlite::SQLiteEnvelopeStack; +pub use self::services::spooler::envelope_stack::EnvelopeStack; #[cfg(test)] mod testutils; diff --git a/relay-server/src/services/spooler/envelope_stack/mod.rs b/relay-server/src/services/spooler/envelope_stack/mod.rs index e7db697f16..d6e9e0b9bb 100644 --- a/relay-server/src/services/spooler/envelope_stack/mod.rs +++ b/relay-server/src/services/spooler/envelope_stack/mod.rs @@ -1,16 +1,27 @@ use crate::envelope::Envelope; +use std::future::Future; -mod sqlite; +pub mod sqlite; +/// A stack-like data structure that holds [`Envelope`]s. pub trait EnvelopeStack { + /// The error type that is returned when an error is encountered during reading or writing the + /// [`EnvelopeStack`]. type Error; + /// Pushes an [`Envelope`] on top of the stack. #[allow(dead_code)] - async fn push(&mut self, envelope: Box) -> Result<(), Self::Error>; + fn push(&mut self, envelope: Box) -> impl Future>; + /// Peeks the [`Envelope`] on top of the stack. + /// + /// If the stack is empty, an error is returned. #[allow(dead_code)] - async fn peek(&mut self) -> Result<&Box, Self::Error>; + fn peek(&mut self) -> impl Future, Self::Error>>; + /// Pops the [`Envelope`] on top of the stack. + /// + /// If the stack is empty, an error is returned. #[allow(dead_code)] - async fn pop(&mut self) -> Result, Self::Error>; + fn pop(&mut self) -> impl Future, Self::Error>>; } diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index ac552bfc2e..bc1da6aa0d 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -124,6 +124,7 @@ impl SQLiteEnvelopeStack { encoded_envelope: e.to_vec().unwrap(), }); + // TODO: check how we can do this in a background tokio task in a non-blocking way. if let Err(err) = build_insert_many_envelopes(insert_envelopes) .build() .execute(&self.db) @@ -263,7 +264,6 @@ impl EnvelopeStack for SQLiteEnvelopeStack { } if self.above_spool_threshold() { - // TODO: investigate how to do spooling/unspooling on a background thread. self.spool_to_disk().await?; } @@ -288,7 +288,6 @@ impl EnvelopeStack for SQLiteEnvelopeStack { async fn peek(&mut self) -> Result<&Box, Self::Error> { if self.below_unspool_threshold() && self.check_disk { - // TODO: investigate how to do spooling/unspooling on a background thread. self.unspool_from_disk().await? } @@ -300,7 +299,6 @@ impl EnvelopeStack for SQLiteEnvelopeStack { async fn pop(&mut self) -> Result, Self::Error> { if self.below_unspool_threshold() && self.check_disk { - // TODO: investigate how to do spooling/unspooling on a background thread. self.unspool_from_disk().await? } diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 265e900e83..590a6e6898 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -59,7 +59,7 @@ use crate::services::test_store::TestStore; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::{ManagedEnvelope, MemoryChecker}; -mod envelope_stack; +pub mod envelope_stack; pub mod spool_utils; mod sql; From 2e58f459c1e4c8024d082bf5781d3fab51e6b500 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 25 Jul 2024 15:38:11 +0200 Subject: [PATCH 17/23] Change batch size --- relay-server/benches/benches.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index 7ef865bee9..401898c86f 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -58,7 +58,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { let mut group = c.benchmark_group("sqlite_envelope_stack"); group.measurement_time(Duration::from_secs(60)); - let disk_batch_size = 10; + let disk_batch_size = 1000; for size in [1_000, 10_000, 100_000].iter() { group.throughput(Throughput::Elements(*size as u64)); From 3927303d53ebe972d3f21b23cb4d1e375b4ed147 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 26 Jul 2024 08:18:05 +0200 Subject: [PATCH 18/23] Update relay-server/src/services/spooler/envelope_stack/sqlite.rs Co-authored-by: Joris Bayer --- relay-server/src/services/spooler/envelope_stack/sqlite.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index bc1da6aa0d..c29875bf82 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -45,7 +45,7 @@ pub struct SQLiteEnvelopeStack { own_key: ProjectKey, /// The project key of the root project of the trace to which all the envelopes belong. sampling_key: ProjectKey, - /// In-memory stack containing all the batches of envelopes that are read and written to disk. + /// In-memory stack containing all the batches of envelopes that either have not been written to disk yet, or have been read from disk recently. #[allow(clippy::vec_box)] batches_buffer: VecDeque>>, /// The total number of envelopes inside the `batches_buffer`. From 84f654818c8c6dcaf8762d9ba72afdac25fe07d4 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 26 Jul 2024 08:52:09 +0200 Subject: [PATCH 19/23] Fix --- .../services/spooler/envelope_stack/sqlite.rs | 36 +++++++------------ 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index c29875bf82..89faedae4c 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -23,10 +23,6 @@ pub enum SQLiteEnvelopeStackError { /// The database encountered an unexpected error. #[error("a database error occurred")] DatabaseError(#[from] sqlx::Error), - - /// The envelope has the project keys that are not matching the ones of the stack. - #[error("the envelope doesn't have matching project keys with the stack")] - MismatchedEnvelope, } /// An [`EnvelopeStack`] that is implemented on an SQLite database. @@ -90,9 +86,7 @@ impl SQLiteEnvelopeStack { sampling_key: ProjectKey, ) -> Result<(), SQLiteEnvelopeStackError> { let mut stack = Self::new(db, disk_batch_size, max_batches, own_key, sampling_key); - stack.unspool_from_disk().await?; - - Ok(()) + stack.unspool_from_disk().await } /// Threshold above which the [`SQLiteEnvelopeStack`] will spool data from the `buffer` to disk. @@ -259,9 +253,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { type Error = SQLiteEnvelopeStackError; async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { - if !self.validate_envelope(&envelope) { - return Err(Self::Error::MismatchedEnvelope); - } + debug_assert!(self.validate_envelope(&envelope)); if self.above_spool_threshold() { self.spool_to_disk().await?; @@ -269,19 +261,19 @@ impl EnvelopeStack for SQLiteEnvelopeStack { // We need to check if the topmost batch has space, if not we have to create a new batch and // push it in front. - if self + if let Some(last_batch) = self .batches_buffer - .front() - .map_or(true, |last_batch| last_batch.len() >= self.batch_size.get()) + .front_mut() + .filter(|last_batch| last_batch.len() < self.batch_size.get()) { - self.batches_buffer - .push_front(Vec::with_capacity(self.batch_size.get())); + last_batch.push(envelope); + } else { + let mut new_batch = Vec::with_capacity(self.batch_size.get()); + new_batch.push(envelope); + self.batches_buffer.push_front(new_batch); } - if let Some(batch) = self.batches_buffer.front_mut() { - batch.push(envelope); - self.batches_buffer_size += 1; - } + self.batches_buffer_size += 1; Ok(()) } @@ -474,6 +466,7 @@ mod tests { } #[tokio::test] + #[should_panic] async fn test_push_with_mismatching_project_keys() { let db = setup_db(false).await; let mut stack = SQLiteEnvelopeStack::new( @@ -485,10 +478,7 @@ mod tests { ); let envelope = mock_envelope(Instant::now()); - assert!(matches!( - stack.push(envelope).await, - Err(SQLiteEnvelopeStackError::MismatchedEnvelope) - )); + let _ = stack.push(envelope).await; } #[tokio::test] From 62cc954c2efd7876242a2d01351c0c715c4171d4 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 26 Jul 2024 08:53:41 +0200 Subject: [PATCH 20/23] Fix --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a5b86084a..34289f8a81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - Use a dedicated thread pool for CPU intensive workloads. ([#3833](https://github.com/getsentry/relay/pull/3833)) - Remove `BufferGuard` in favor of memory checks via `MemoryStat`. ([#3821](https://github.com/getsentry/relay/pull/3821)) +- Add `EnvelopeStack` and `SQLiteEnvelopeStack` to manage envelopes on disk. ([#3855](https://github.com/getsentry/relay/pull/3855)) ## 24.7.0 From 236da2881236f768ac0891efb54d923074218871 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 26 Jul 2024 09:00:30 +0200 Subject: [PATCH 21/23] Fix --- relay-server/src/services/spooler/envelope_stack/sqlite.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index 89faedae4c..1a6be79ba0 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -1,7 +1,6 @@ use crate::envelope::Envelope; use crate::extractors::StartTime; use crate::services::spooler::envelope_stack::EnvelopeStack; -use crate::utils::get_sampling_key; use futures::StreamExt; use relay_base_schema::project::ProjectKey; use sqlx::query::Query; @@ -243,7 +242,7 @@ impl SQLiteEnvelopeStack { /// [`SQLiteEnvelopeStack`]. fn validate_envelope(&self, envelope: &Envelope) -> bool { let own_key = envelope.meta().public_key(); - let sampling_key = get_sampling_key(envelope).unwrap_or(own_key); + let sampling_key = envelope.sampling_key().unwrap_or(own_key); self.own_key == own_key && self.sampling_key == sampling_key } From fb8e1e004b3f45bd759658682b8f897d22f911f8 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 26 Jul 2024 09:56:03 +0200 Subject: [PATCH 22/23] Make order aligned --- .../src/services/spooler/envelope_stack/sqlite.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index 1a6be79ba0..f65485f626 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -214,7 +214,7 @@ impl SQLiteEnvelopeStack { // We push in the back of the buffer, since we still want to give priority to // incoming envelopes that have a more recent timestamp. self.batches_buffer_size += extracted_envelopes.len(); - self.batches_buffer.push_back(extracted_envelopes); + self.batches_buffer.push_front(extracted_envelopes); Ok(()) } @@ -262,14 +262,14 @@ impl EnvelopeStack for SQLiteEnvelopeStack { // push it in front. if let Some(last_batch) = self .batches_buffer - .front_mut() + .back_mut() .filter(|last_batch| last_batch.len() < self.batch_size.get()) { last_batch.push(envelope); } else { let mut new_batch = Vec::with_capacity(self.batch_size.get()); new_batch.push(envelope); - self.batches_buffer.push_front(new_batch); + self.batches_buffer.push_back(new_batch); } self.batches_buffer_size += 1; @@ -283,7 +283,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { } self.batches_buffer - .front() + .back() .and_then(|last_batch| last_batch.last()) .ok_or(Self::Error::Empty) } @@ -295,7 +295,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { let result = self .batches_buffer - .front_mut() + .back_mut() .and_then(|last_batch| { self.batches_buffer_size -= 1; last_batch.pop() @@ -305,10 +305,10 @@ impl EnvelopeStack for SQLiteEnvelopeStack { // Since we might leave a batch without elements, we want to pop it from the buffer. if self .batches_buffer - .front() + .back() .map_or(false, |last_batch| last_batch.is_empty()) { - self.batches_buffer.pop_front(); + self.batches_buffer.pop_back(); } result From b432c067272f6300e40ab649454932f1e004fde8 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 26 Jul 2024 09:56:47 +0200 Subject: [PATCH 23/23] Fix --- .../services/spooler/envelope_stack/sqlite.rs | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index f65485f626..c3fbae8676 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -74,20 +74,6 @@ impl SQLiteEnvelopeStack { } } - /// Create a new [`SQLiteEnvelopeStack`] and prefills it with data from disk, up to a fixed - /// number of elements. - #[allow(dead_code)] - pub async fn prepare( - db: Pool, - disk_batch_size: usize, - max_batches: usize, - own_key: ProjectKey, - sampling_key: ProjectKey, - ) -> Result<(), SQLiteEnvelopeStackError> { - let mut stack = Self::new(db, disk_batch_size, max_batches, own_key, sampling_key); - stack.unspool_from_disk().await - } - /// Threshold above which the [`SQLiteEnvelopeStack`] will spool data from the `buffer` to disk. fn above_spool_threshold(&self) -> bool { self.batches_buffer_size >= self.spool_threshold.get() @@ -105,7 +91,7 @@ impl SQLiteEnvelopeStack { /// to be written to disk are lost. The explanation for this behavior can be found in the body /// of the method. async fn spool_to_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> { - let Some(envelopes) = self.batches_buffer.pop_back() else { + let Some(envelopes) = self.batches_buffer.pop_front() else { return Ok(()); }; self.batches_buffer_size -= envelopes.len();