Skip to content

Commit

Permalink
Improve
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jul 29, 2024
1 parent 6f713dd commit 5fe818f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 22 deletions.
14 changes: 6 additions & 8 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,9 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
(stack, envelopes)
},
|(mut stack, envelopes)| {
runtime.block_on(async {
for envelope in envelopes {
stack.push(envelope).await.unwrap();
}
});
for envelope in envelopes {
stack.push(envelope).unwrap();
}
},
);
},
Expand All @@ -130,7 +128,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
// Pre-fill the stack
for _ in 0..size {
let envelope = mock_envelope(envelope_size);
stack.push(envelope).await.unwrap();
stack.push(envelope).unwrap();
}

stack
Expand Down Expand Up @@ -179,12 +177,12 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
for _ in 0..size {
if rand::random::<bool>() {
if let Some(envelope) = envelope_iter.next() {
stack.push(envelope).await.unwrap();
stack.push(envelope).unwrap();
}
} else if stack.pop().await.is_err() {
// If pop fails (empty stack), push instead
if let Some(envelope) = envelope_iter.next() {
stack.push(envelope).await.unwrap();
stack.push(envelope).unwrap();
}
}
}
Expand Down
71 changes: 57 additions & 14 deletions relay-server/src/services/spooler/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,17 @@ impl SQLiteEnvelopeStack {
// the new size.
self.batches_buffer_size -= envelopes.len();

let insert_envelopes = envelopes
.iter()
.map(|e| InsertEnvelope {
let db = self.db.clone();
let own_key = self.own_key;
let sampling_key = self.sampling_key;
let spooling_handle = tokio::spawn(async move {
let insert_envelopes = envelopes.iter().map(|e| InsertEnvelope {
received_at: received_at(e),
own_key: self.own_key,
sampling_key: self.sampling_key,
own_key,
sampling_key,
encoded_envelope: e.to_vec().unwrap(),
})
.collect();
let db = self.db.clone();
});

let spooling_handle = tokio::spawn(async move {
// When spawning this task, we are acknowledging that the elements that we popped from
// the buffer are lost in case of error during insertion.
//
Expand All @@ -152,6 +151,9 @@ impl SQLiteEnvelopeStack {
{
relay_log::error!(
error = &err as &dyn Error,
own_key = own_key.to_string(),
sampling_key = sampling_key.to_string(),
number_of_envelopes = envelopes.len(),
"failed to spool envelopes to disk",
);
}
Expand Down Expand Up @@ -301,25 +303,25 @@ impl EnvelopeStack for SQLiteEnvelopeStack {
}

async fn peek(&mut self) -> Result<&Box<Envelope>, Self::Error> {
self.try_wait_spooling().await?;

if self.below_unspool_threshold() && self.check_disk {
self.unspool_from_disk().await?
}

self.try_wait_spooling().await?;

self.batches_buffer
.back()
.and_then(|last_batch| last_batch.last())
.ok_or(Self::Error::Empty)
}

async fn pop(&mut self) -> Result<Box<Envelope>, Self::Error> {
self.try_wait_spooling().await?;

if self.below_unspool_threshold() && self.check_disk {
self.unspool_from_disk().await?
}

self.try_wait_spooling().await?;

let result = self
.batches_buffer
.back_mut()
Expand Down Expand Up @@ -352,7 +354,9 @@ struct InsertEnvelope {
}

/// Builds a query that inserts many [`Envelope`]s in the database.
fn build_insert_many_envelopes<'a>(envelopes: Vec<InsertEnvelope>) -> QueryBuilder<'a, Sqlite> {
fn build_insert_many_envelopes<'a>(
envelopes: impl Iterator<Item = InsertEnvelope>,
) -> QueryBuilder<'a, Sqlite> {
let mut builder: QueryBuilder<Sqlite> =
QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) ");

Expand Down Expand Up @@ -697,4 +701,43 @@ mod tests {
}
assert_eq!(stack.batches_buffer_size, 0);
}

#[tokio::test]
async fn test_spooling_handle_is_set() {
let db = setup_db(true).await;
let mut stack = SQLiteEnvelopeStack::new(
db,
1,
1,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
);

// We push 1 envelope.
let envelope_1 = mock_envelope(Instant::now());
assert!(stack.push(envelope_1.clone()).is_ok());
assert!(stack.spooling_handle.is_none());
assert_eq!(stack.batches_buffer_size, 1);

// We push the 2nd envelope which will cause async spooling.
let envelope_2 = mock_envelope(Instant::now());
assert!(stack.push(envelope_2.clone()).is_ok());
assert!(stack.spooling_handle.is_some());
assert_eq!(stack.batches_buffer_size, 1);

// We pop the in-memory element which doesn't need to wait on the async spooling.
let popped_envelope = stack.pop().await.unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope_2.event_id().unwrap()
);

// Now the stack is empty, and we want to wait for the async spooling to finish if it didn't
// already.
let popped_envelope = stack.pop().await.unwrap();
assert_eq!(
popped_envelope.event_id().unwrap(),
envelope_1.event_id().unwrap()
);
}
}

0 comments on commit 5fe818f

Please sign in to comment.