Skip to content

Commit

Permalink
feat(spooler): Add async spooling
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jul 26, 2024
1 parent b432c06 commit ea19f5c
Showing 1 changed file with 65 additions and 59 deletions.
124 changes: 65 additions & 59 deletions relay-server/src/services/spooler/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,41 +90,47 @@ impl SQLiteEnvelopeStack {
/// In case there is a failure while writing envelopes, all the envelopes that were enqueued
/// to be written to disk are lost. The explanation for this behavior can be found in the body
/// of the method.
async fn spool_to_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> {
fn background_spool_to_disk(&mut self) {
let Some(envelopes) = self.batches_buffer.pop_front() else {
return Ok(());
return;
};
// We optimistically assume that such elements were removed from the buffer, so we update
// the new size.
self.batches_buffer_size -= envelopes.len();

let insert_envelopes = envelopes.iter().map(|e| InsertEnvelope {
received_at: received_at(e),
own_key: self.own_key,
sampling_key: self.sampling_key,
encoded_envelope: e.to_vec().unwrap(),
let insert_envelopes = envelopes
.iter()
.map(|e| InsertEnvelope {
received_at: received_at(e),
own_key: self.own_key,
sampling_key: self.sampling_key,
encoded_envelope: e.to_vec().unwrap(),
})
.collect();
let db = self.db.clone();
tokio::spawn(async move {
// When spawning this task, we are acknowledging that the elements that we popped from
// the buffer are lost in case of error during insertion.
//
// We are doing this on purposes, since if we were to have a database corruption during
// runtime, and we were to put the values back into the buffer we will end up with an
// infinite cycle.
if let Err(err) = build_insert_many_envelopes(insert_envelopes)
.build()
.execute(&db)
.await
{
relay_log::error!(
error = &err as &dyn Error,
"failed to spool envelopes to disk",
);
}
});

// TODO: check how we can do this in a background tokio task in a non-blocking way.
if let Err(err) = build_insert_many_envelopes(insert_envelopes)
.build()
.execute(&self.db)
.await
{
relay_log::error!(
error = &err as &dyn Error,
"failed to spool envelopes to disk",
);

// When early return here, we are acknowledging that the elements that we popped from
// the buffer are lost. We are doing this on purposes, since if we were to have a
// database corruption during runtime, and we were to put the values back into the buffer
// we will end up with an infinite cycle.
return Err(SQLiteEnvelopeStackError::DatabaseError(err));
}

// If we successfully spooled to disk, we know that data should be there.
// We optimistically assume that data was written to disk and that in the next read/writes
// we could try to check the disk. However, this doesn't guarantee data will be there, since
// data might be written after this flag is set.
self.check_disk = true;

Ok(())
}

/// Unspools from disk up to `disk_batch_size` envelopes and appends them to the `buffer`.
Expand Down Expand Up @@ -241,7 +247,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack {
debug_assert!(self.validate_envelope(&envelope));

if self.above_spool_threshold() {
self.spool_to_disk().await?;
self.background_spool_to_disk();
}

// We need to check if the topmost batch has space, if not we have to create a new batch and
Expand Down Expand Up @@ -311,9 +317,7 @@ struct InsertEnvelope {
}

/// Builds a query that inserts many [`Envelope`]s in the database.
fn build_insert_many_envelopes<'a>(
envelopes: impl Iterator<Item = InsertEnvelope>,
) -> QueryBuilder<'a, Sqlite> {
fn build_insert_many_envelopes<'a>(envelopes: Vec<InsertEnvelope>) -> QueryBuilder<'a, Sqlite> {
let mut builder: QueryBuilder<Sqlite> =
QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) ");

Expand Down Expand Up @@ -370,6 +374,7 @@ mod tests {
use std::path::Path;
use std::time::{Duration, Instant};
use tokio::fs::DirBuilder;
use tokio::time::sleep;
use uuid::Uuid;

fn request_meta() -> RequestMeta {
Expand Down Expand Up @@ -484,36 +489,34 @@ mod tests {
assert!(stack.push(envelope).await.is_ok());
}

// We push 1 more envelope which results in spooling, which fails because of a database
// problem.
let envelope = mock_envelope(Instant::now());
assert!(matches!(
stack.push(envelope).await,
Err(SQLiteEnvelopeStackError::DatabaseError(_))
));
// We push 1 more envelope that results in spooling, which returns ok but will fail in the
// async task that writes to the db.
let envelope_1 = mock_envelope(Instant::now());
assert!(stack.push(envelope_1.clone()).await.is_ok());

// The stack now contains the last of the 3 elements that were added. If we add a new one
// we will end up with 2.
let envelope = mock_envelope(Instant::now());
assert!(stack.push(envelope.clone()).await.is_ok());
assert_eq!(stack.batches_buffer_size, 3);
// We wait for the async spooling to take place.
sleep(Duration::from_millis(100)).await;

// The stack now contains 2 + 1 elements, since we have the remaining 2 elements of the 4
// and the newly added element.
let envelope_2 = mock_envelope(Instant::now());
assert!(stack.push(envelope_2.clone()).await.is_ok());
assert_eq!(stack.batches_buffer_size, 4);

// We pop the remaining elements, expecting the last added envelope to be on top.
let popped_envelope_1 = stack.pop().await.unwrap();
let popped_envelope_2 = stack.pop().await.unwrap();
let popped_envelope_3 = stack.pop().await.unwrap();
assert_eq!(
popped_envelope_1.event_id().unwrap(),
envelope.event_id().unwrap()
);
assert_eq!(
popped_envelope_2.event_id().unwrap(),
envelopes.clone()[3].event_id().unwrap()
);
assert_eq!(
popped_envelope_3.event_id().unwrap(),
envelopes.clone()[2].event_id().unwrap()
);
let expected_envelopes = [
envelope_2,
envelope_1,
envelopes[3].clone(),
envelopes[2].clone(),
];
for expected_envelope in expected_envelopes {
let popped_envelope = stack.pop().await.unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
expected_envelope.event_id().unwrap()
);
}
assert_eq!(stack.batches_buffer_size, 0);
}

Expand Down Expand Up @@ -608,6 +611,9 @@ mod tests {
}
assert_eq!(stack.batches_buffer_size, 10);

// We wait for the async spooling to take place.
sleep(Duration::from_millis(100)).await;

// We peek the top element.
let peeked_envelope = stack.peek().await.unwrap();
assert_eq!(
Expand Down

0 comments on commit ea19f5c

Please sign in to comment.