diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/spooler/envelope_stack/sqlite.rs index c3fbae8676..140120210f 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/spooler/envelope_stack/sqlite.rs @@ -90,41 +90,47 @@ impl SQLiteEnvelopeStack { /// In case there is a failure while writing envelopes, all the envelopes that were enqueued /// to be written to disk are lost. The explanation for this behavior can be found in the body /// of the method. - async fn spool_to_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> { + fn background_spool_to_disk(&mut self) { let Some(envelopes) = self.batches_buffer.pop_front() else { - return Ok(()); + return; }; + // We optimistically assume that such elements were removed from the buffer, so we update + // the new size. self.batches_buffer_size -= envelopes.len(); - let insert_envelopes = envelopes.iter().map(|e| InsertEnvelope { - received_at: received_at(e), - own_key: self.own_key, - sampling_key: self.sampling_key, - encoded_envelope: e.to_vec().unwrap(), + let insert_envelopes = envelopes + .iter() + .map(|e| InsertEnvelope { + received_at: received_at(e), + own_key: self.own_key, + sampling_key: self.sampling_key, + encoded_envelope: e.to_vec().unwrap(), + }) + .collect(); + let db = self.db.clone(); + tokio::spawn(async move { + // When spawning this task, we are acknowledging that the elements that we popped from + // the buffer are lost in case of error during insertion. + // + // We are doing this on purposes, since if we were to have a database corruption during + // runtime, and we were to put the values back into the buffer we will end up with an + // infinite cycle. + if let Err(err) = build_insert_many_envelopes(insert_envelopes) + .build() + .execute(&db) + .await + { + relay_log::error!( + error = &err as &dyn Error, + "failed to spool envelopes to disk", + ); + } }); - // TODO: check how we can do this in a background tokio task in a non-blocking way. - if let Err(err) = build_insert_many_envelopes(insert_envelopes) - .build() - .execute(&self.db) - .await - { - relay_log::error!( - error = &err as &dyn Error, - "failed to spool envelopes to disk", - ); - - // When early return here, we are acknowledging that the elements that we popped from - // the buffer are lost. We are doing this on purposes, since if we were to have a - // database corruption during runtime, and we were to put the values back into the buffer - // we will end up with an infinite cycle. - return Err(SQLiteEnvelopeStackError::DatabaseError(err)); - } - - // If we successfully spooled to disk, we know that data should be there. + // We optimistically assume that data was written to disk and that in the next read/writes + // we could try to check the disk. However, this doesn't guarantee data will be there, since + // data might be written after this flag is set. self.check_disk = true; - - Ok(()) } /// Unspools from disk up to `disk_batch_size` envelopes and appends them to the `buffer`. @@ -241,7 +247,7 @@ impl EnvelopeStack for SQLiteEnvelopeStack { debug_assert!(self.validate_envelope(&envelope)); if self.above_spool_threshold() { - self.spool_to_disk().await?; + self.background_spool_to_disk(); } // We need to check if the topmost batch has space, if not we have to create a new batch and @@ -311,9 +317,7 @@ struct InsertEnvelope { } /// Builds a query that inserts many [`Envelope`]s in the database. -fn build_insert_many_envelopes<'a>( - envelopes: impl Iterator, -) -> QueryBuilder<'a, Sqlite> { +fn build_insert_many_envelopes<'a>(envelopes: Vec) -> QueryBuilder<'a, Sqlite> { let mut builder: QueryBuilder = QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) "); @@ -370,6 +374,7 @@ mod tests { use std::path::Path; use std::time::{Duration, Instant}; use tokio::fs::DirBuilder; + use tokio::time::sleep; use uuid::Uuid; fn request_meta() -> RequestMeta { @@ -484,36 +489,34 @@ mod tests { assert!(stack.push(envelope).await.is_ok()); } - // We push 1 more envelope which results in spooling, which fails because of a database - // problem. - let envelope = mock_envelope(Instant::now()); - assert!(matches!( - stack.push(envelope).await, - Err(SQLiteEnvelopeStackError::DatabaseError(_)) - )); + // We push 1 more envelope that results in spooling, which returns ok but will fail in the + // async task that writes to the db. + let envelope_1 = mock_envelope(Instant::now()); + assert!(stack.push(envelope_1.clone()).await.is_ok()); - // The stack now contains the last of the 3 elements that were added. If we add a new one - // we will end up with 2. - let envelope = mock_envelope(Instant::now()); - assert!(stack.push(envelope.clone()).await.is_ok()); - assert_eq!(stack.batches_buffer_size, 3); + // We wait for the async spooling to take place. + sleep(Duration::from_millis(100)).await; + + // The stack now contains 2 + 1 elements, since we have the remaining 2 elements of the 4 + // and the newly added element. + let envelope_2 = mock_envelope(Instant::now()); + assert!(stack.push(envelope_2.clone()).await.is_ok()); + assert_eq!(stack.batches_buffer_size, 4); // We pop the remaining elements, expecting the last added envelope to be on top. - let popped_envelope_1 = stack.pop().await.unwrap(); - let popped_envelope_2 = stack.pop().await.unwrap(); - let popped_envelope_3 = stack.pop().await.unwrap(); - assert_eq!( - popped_envelope_1.event_id().unwrap(), - envelope.event_id().unwrap() - ); - assert_eq!( - popped_envelope_2.event_id().unwrap(), - envelopes.clone()[3].event_id().unwrap() - ); - assert_eq!( - popped_envelope_3.event_id().unwrap(), - envelopes.clone()[2].event_id().unwrap() - ); + let expected_envelopes = [ + envelope_2, + envelope_1, + envelopes[3].clone(), + envelopes[2].clone(), + ]; + for expected_envelope in expected_envelopes { + let popped_envelope = stack.pop().await.unwrap(); + assert_eq!( + popped_envelope.event_id().unwrap(), + expected_envelope.event_id().unwrap() + ); + } assert_eq!(stack.batches_buffer_size, 0); } @@ -608,6 +611,9 @@ mod tests { } assert_eq!(stack.batches_buffer_size, 10); + // We wait for the async spooling to take place. + sleep(Duration::from_millis(100)).await; + // We peek the top element. let peeked_envelope = stack.peek().await.unwrap(); assert_eq!(