From bb90d306bd1b7cafde59b3f5adae560764333c16 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 12 Sep 2024 09:44:59 +0200 Subject: [PATCH] fix(spooler): Fix datetime comparison (#4025) --- CHANGELOG.md | 1 + .../services/buffer/envelope_store/sqlite.rs | 33 ++++++++++++------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index feaacb3e64..c6d0dead57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Use `matches_any_origin` to scrub HTTP hosts in spans. ([#3939](https://github.com/getsentry/relay/pull/3939)). - Keep frames from both ends of the stacktrace when trimming frames. ([#3905](https://github.com/getsentry/relay/pull/3905)) +- Use `UnixTimestamp` instead of `DateTime` when sorting envelopes from disk. ([#4025](https://github.com/getsentry/relay/pull/4025)) **Features**: diff --git a/relay-server/src/services/buffer/envelope_store/sqlite.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs index e2dafa6685..06ab3b80c3 100644 --- a/relay-server/src/services/buffer/envelope_store/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -13,6 +13,7 @@ use crate::Envelope; use futures::stream::StreamExt; use hashbrown::HashSet; use relay_base_schema::project::{ParseProjectKeyError, ProjectKey}; +use relay_common::time::UnixTimestamp; use relay_config::Config; use sqlx::migrate::MigrateError; use sqlx::query::Query; @@ -369,10 +370,13 @@ impl SqliteEnvelopeStore { } } - // We sort envelopes by `received_at`. + // We sort envelopes by `received_at` in ascending order. + // // Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't // return deleted rows in a specific order. - extracted_envelopes.sort_by_key(|a| a.received_at()); + extracted_envelopes.sort_by_key(|a| { + UnixTimestamp::from_datetime(a.received_at()).unwrap_or(UnixTimestamp::now()) + }); Ok(extracted_envelopes) } @@ -518,12 +522,10 @@ pub fn build_count_all<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> { #[cfg(test)] mod tests { - use hashbrown::HashSet; use std::time::Duration; use tokio::time::sleep; use relay_base_schema::project::ProjectKey; - use relay_event_schema::protocol::EventId; use super::*; use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db}; @@ -538,21 +540,30 @@ mod tests { // We insert 10 envelopes. let envelopes = mock_envelopes(10); - let envelope_ids: HashSet = - envelopes.iter().filter_map(|e| e.event_id()).collect(); assert!(envelope_store .insert_many(envelopes.iter().map(|e| e.as_ref().try_into().unwrap())) .await .is_ok()); - // We check that if we load more than the limit, we still get back at most 10. + // We check that if we load 5, we get the newest 5. + let extracted_envelopes = envelope_store + .delete_many(own_key, sampling_key, 5) + .await + .unwrap(); + assert_eq!(extracted_envelopes.len(), 5); + for (i, extracted_envelope) in extracted_envelopes.iter().enumerate().take(5) { + assert_eq!(extracted_envelope.event_id(), envelopes[5..][i].event_id()); + } + + // We check that if we load more than the envelopes stored on disk, we still get back at + // most 5. let extracted_envelopes = envelope_store - .delete_many(own_key, sampling_key, 15) + .delete_many(own_key, sampling_key, 10) .await .unwrap(); - assert_eq!(envelopes.len(), 10); - for envelope in extracted_envelopes { - assert!(envelope_ids.contains(&envelope.event_id().unwrap())); + assert_eq!(extracted_envelopes.len(), 5); + for (i, extracted_envelope) in extracted_envelopes.iter().enumerate().take(5) { + assert_eq!(extracted_envelope.event_id(), envelopes[0..5][i].event_id()); } }