Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jul 24, 2024
1 parent e43f61e commit 2fe08af
Showing 1 changed file with 122 additions and 26 deletions.
148 changes: 122 additions & 26 deletions relay-server/src/services/spooler/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ impl Ord for OrderedEnvelope {
pub enum SQLiteEnvelopeStackError {
#[error("the stack is empty")]
Empty,

#[error("a database error occurred")]
DatabaseError(#[from] sqlx::Error),
}

pub struct SQLiteEnvelopeStack {
Expand Down Expand Up @@ -64,6 +67,19 @@ impl SQLiteEnvelopeStack {
}
}

#[allow(dead_code)]
pub async fn prepare(
db: Pool<Sqlite>,
spool_threshold: usize,
own_key: ProjectKey,
sampling_key: ProjectKey,
) -> Result<(), SQLiteEnvelopeStackError> {
let mut stack = Self::new(db, spool_threshold, own_key, sampling_key);
stack.load_from_disk().await?;

Ok(())
}

fn above_spool_threshold(&self) -> bool {
self.buffer.len() + 1 > self.spool_threshold
}
Expand All @@ -72,17 +88,28 @@ impl SQLiteEnvelopeStack {
self.buffer.is_empty()
}

async fn spool_to_disk(&mut self) {
fn disk_batch_size(&self) -> usize {
self.spool_threshold / 2
}

async fn spool_to_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> {
if self.disk_batch_size() == 0 {
return Ok(());
}

// TODO: we can make a custom iterator to consume back elements until threshold to avoid
// allocating a vector.
let mut envelopes = Vec::with_capacity(self.spool_threshold / 2);
for _ in 0..(self.spool_threshold / 2) {
let mut envelopes = Vec::with_capacity(self.disk_batch_size());
for _ in 0..self.disk_batch_size() {
let Some(value) = self.buffer.pop_back() else {
break;
};

envelopes.push(value);
}
if envelopes.is_empty() {
return Ok(());
}

let insert_envelopes = envelopes.iter().map(|e| InsertEnvelope {
received_at: received_at(e),
Expand All @@ -100,27 +127,31 @@ impl SQLiteEnvelopeStack {
error = &err as &dyn Error,
"failed to spool envelopes to disk",
);

return Err(SQLiteEnvelopeStackError::DatabaseError(err));
}

Ok(())
}

async fn load_from_disk(&mut self) {
async fn load_from_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> {
let envelopes = build_delete_and_fetch_many_envelopes(
self.own_key,
self.sampling_key,
(self.spool_threshold / 2) as i64,
self.disk_batch_size() as i64,
)
.fetch(&self.db)
.peekable();

let mut envelopes = pin!(envelopes);
if envelopes.as_mut().peek().await.is_none() {
return;
return Ok(());
}

// We use a priority map to order envelopes that are deleted from the database.
// Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't
// return deleted rows in a specific order.
let mut ordered_envelopes = BinaryHeap::with_capacity(self.spool_threshold / 2);
let mut ordered_envelopes = BinaryHeap::with_capacity(self.disk_batch_size());
while let Some(envelope) = envelopes.as_mut().next().await {
let envelope = match envelope {
Ok(envelope) => envelope,
Expand All @@ -129,7 +160,10 @@ impl SQLiteEnvelopeStack {
error = &err as &dyn Error,
"failed to unspool the envelopes from the disk",
);
continue;

// We early return under the assumption that the stream, if it contains an
// error, it means that the query failed.
return Err(SQLiteEnvelopeStackError::DatabaseError(err));
}
};

Expand All @@ -151,6 +185,8 @@ impl SQLiteEnvelopeStack {
// incoming envelopes that have a more recent timestamp.
self.buffer.push_back(envelope.0)
}

Ok(())
}

fn extract_envelope(&self, row: SqliteRow) -> Result<Box<Envelope>, SQLiteEnvelopeStackError> {
Expand All @@ -177,7 +213,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack {

async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), Self::Error> {
if self.above_spool_threshold() {
self.spool_to_disk().await;
self.spool_to_disk().await?;
}

self.buffer.push_front(envelope);
Expand All @@ -187,15 +223,15 @@ impl EnvelopeStack for SQLiteEnvelopeStack {

async fn peek(&mut self) -> Result<&Box<Envelope>, Self::Error> {
if self.below_unspool_threshold() {
self.load_from_disk().await
self.load_from_disk().await?
}

self.buffer.front().ok_or(Self::Error::Empty)
}

async fn pop(&mut self) -> Result<Box<Envelope>, Self::Error> {
if self.below_unspool_threshold() {
self.load_from_disk().await
self.load_from_disk().await?
}

self.buffer.pop_front().ok_or(Self::Error::Empty)
Expand Down Expand Up @@ -270,25 +306,27 @@ mod tests {
RequestMeta::new(dsn)
}

fn mock_envelope(instant: Instant) -> Box<Envelope> {
let event_id = EventId::new();
let mut envelope = Envelope::from_request(Some(event_id), request_meta());
envelope.set_start_time(instant);

let mut item = Item::new(ItemType::Attachment);
item.set_filename("item");
envelope.add_item(item);

envelope
}

#[allow(clippy::vec_box)]
fn mock_envelopes(count: usize) -> Vec<Box<Envelope>> {
let instant = Instant::now();
(0..count)
.map(|i| {
let event_id = EventId::new();
let mut envelope = Envelope::from_request(Some(event_id), request_meta());
envelope.set_start_time(instant - Duration::from_secs(((count - i) * 3600) as u64));

let mut item = Item::new(ItemType::Attachment);
item.set_filename("item");
envelope.add_item(item);

envelope
})
.map(|i| mock_envelope(instant - Duration::from_secs((count - i) as u64)))
.collect()
}

async fn setup() -> Pool<Sqlite> {
async fn setup_db(run_migrations: bool) -> Pool<Sqlite> {
let path = std::env::temp_dir().join(Uuid::new_v4().to_string());

create_spool_directory(&path).await;
Expand All @@ -303,7 +341,9 @@ mod tests {
.await
.unwrap();

sqlx::migrate!("../migrations").run(&db).await.unwrap();
if run_migrations {
sqlx::migrate!("../migrations").run(&db).await.unwrap();
}

db
}
Expand All @@ -323,9 +363,53 @@ mod tests {
}
}

#[tokio::test]
async fn test_push_when_db_is_not_valid() {
let db = setup_db(false).await;
let mut stack = SQLiteEnvelopeStack::new(
db,
3,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
);

let envelopes = mock_envelopes(3);

// We push the 3 envelopes without errors because they are below the threshold.
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}

// We push 1 more envelope which results in spooling, which fails because of a database
// problem.
let envelope = mock_envelope(Instant::now());
assert!(stack.push(envelope).await.is_err());

// Now one element should have been popped because the stack tried to spool it and the
// previous, insertion failed, so we have only 2 elements in the stack, we can now add a
// new one and we will succeed.
let envelope = mock_envelope(Instant::now());
assert!(stack.push(envelope).await.is_ok());
assert_eq!(stack.buffer.len(), 3);
}

#[tokio::test]
async fn test_pop_when_db_is_not_valid() {
let db = setup_db(false).await;
let mut stack = SQLiteEnvelopeStack::new(
db,
3,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
);

// We pop with no elements.
assert!(stack.pop().await.is_err());
}

#[tokio::test]
async fn test_push_below_threshold_and_pop() {
let db = setup().await;
let db = setup_db(true).await;
let mut stack = SQLiteEnvelopeStack::new(
db,
10,
Expand Down Expand Up @@ -360,7 +444,7 @@ mod tests {

#[tokio::test]
async fn test_push_above_threshold_and_pop() {
let db = setup().await;
let db = setup_db(true).await;
let mut stack = SQLiteEnvelopeStack::new(
db,
10,
Expand Down Expand Up @@ -401,6 +485,18 @@ mod tests {
envelopes.clone()[4].event_id().unwrap()
);

// We insert a new envelope, to test the load from disk happening during `peek()` gives
// priority to this envelope in the stack.
let envelope = mock_envelope(Instant::now());
assert!(stack.push(envelope.clone()).await.is_ok());

// We pop and expect the newly inserted element.
let popped_envelope = stack.pop().await.unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope.event_id().unwrap()
);

// We pop 5 envelopes, which should not result in a disk load since `peek()` already should
// have caused it.
for envelope in envelopes[0..5].iter().rev() {
Expand Down

0 comments on commit 2fe08af

Please sign in to comment.