From 05deaee8d3528b4ff0ad6988303b15be6dfc6099 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 13 Aug 2024 11:06:29 +0200 Subject: [PATCH 1/2] feat(spooler): Add last_peek value in the priority --- .../services/buffer/envelope_buffer/mod.rs | 118 ++++++++++++++---- 1 file changed, 97 insertions(+), 21 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 969d9a3565..cf3943b628 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -1,12 +1,11 @@ +use relay_base_schema::project::ProjectKey; +use relay_config::Config; +use stack_key::StackKey; use std::cmp::Ordering; use std::collections::BTreeSet; use std::convert::Infallible; use std::time::Instant; -use relay_base_schema::project::ProjectKey; -use relay_config::Config; -use stack_key::StackKey; - use crate::envelope::Envelope; use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; use crate::services::buffer::envelope_stack::{EnvelopeStack, StackProvider}; @@ -163,13 +162,30 @@ where /// Returns a reference to the next-in-line envelope, if one exists. pub async fn peek(&mut self) -> Result, EnvelopeBufferError> { + let Some(( + QueueItem { + key: stack_key, + value: _, + }, + _, + )) = self.priority_queue.peek() + else { + return Ok(None); + }; + + let stack_key = *stack_key; + + self.priority_queue.change_priority_by(&stack_key, |prio| { + prio.last_peek = Instant::now(); + }); + let Some(( QueueItem { key: _, value: stack, }, _, - )) = self.priority_queue.peek_mut() + )) = self.priority_queue.get_mut(&stack_key) else { return Ok(None); }; @@ -335,6 +351,7 @@ impl Eq for QueueItem {} struct Priority { readiness: Readiness, received_at: Instant, + last_peek: Instant, } impl Priority { @@ -342,13 +359,16 @@ impl Priority { Self { readiness: Readiness::new(), received_at, + last_peek: Instant::now(), } } } impl PartialEq for Priority { fn eq(&self, other: &Self) -> bool { - self.readiness.ready() == other.readiness.ready() && self.received_at == other.received_at + self.readiness.ready() == other.readiness.ready() + && self.received_at == other.received_at + && self.last_peek == other.last_peek } } @@ -363,12 +383,22 @@ impl Eq for Priority {} impl Ord for Priority { fn cmp(&self, other: &Self) -> Ordering { match (self.readiness.ready(), other.readiness.ready()) { - (true, true) => self.received_at.cmp(&other.received_at), + // Assuming that two priorities differ only w.r.t. the `last_peek`, we want to prioritize + // stacks that were the least recently peeked. The rationale behind this is that we want + // to keep cycling through different stacks while peeking. + (true, true) => self + .received_at + .cmp(&other.received_at) + .then(self.last_peek.cmp(&other.last_peek).reverse()), (true, false) => Ordering::Greater, (false, true) => Ordering::Less, // For non-ready stacks, we invert the priority, such that projects that are not // ready and did not receive envelopes recently can be evicted. - (false, false) => self.received_at.cmp(&other.received_at).reverse(), + (false, false) => self + .received_at + .cmp(&other.received_at) + .reverse() + .then(self.last_peek.cmp(&other.last_peek).reverse()), } } } @@ -389,10 +419,10 @@ impl Readiness { #[cfg(test)] mod tests { use std::str::FromStr; - use uuid::Uuid; use relay_common::Dsn; + use relay_event_schema::protocol::EventId; use relay_sampling::DynamicSamplingContext; use crate::envelope::{Item, ItemType}; @@ -400,7 +430,11 @@ mod tests { use super::*; - fn new_envelope(project_key: ProjectKey, sampling_key: Option) -> Box { + fn new_envelope( + project_key: ProjectKey, + sampling_key: Option, + event_id: Option, + ) -> Box { let mut envelope = Envelope::from_request( None, RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()), @@ -420,11 +454,14 @@ mod tests { }); envelope.add_item(Item::new(ItemType::Transaction)); } + if let Some(event_id) = event_id { + envelope.set_event_id(event_id); + } envelope } #[tokio::test] - async fn insert_pop() { + async fn test_insert_pop() { let mut buffer = EnvelopeBuffer::::new(); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -434,20 +471,29 @@ mod tests { assert!(buffer.pop().await.unwrap().is_none()); assert!(buffer.peek().await.unwrap().is_none()); - buffer.push(new_envelope(project_key1, None)).await.unwrap(); + buffer + .push(new_envelope(project_key1, None, None)) + .await + .unwrap(); assert_eq!( buffer.peek().await.unwrap().unwrap().meta().public_key(), project_key1 ); - buffer.push(new_envelope(project_key2, None)).await.unwrap(); + buffer + .push(new_envelope(project_key2, None, None)) + .await + .unwrap(); // Both projects are not ready, so project 1 is on top (has the oldest envelopes): assert_eq!( buffer.peek().await.unwrap().unwrap().meta().public_key(), project_key1 ); - buffer.push(new_envelope(project_key3, None)).await.unwrap(); + buffer + .push(new_envelope(project_key3, None, None)) + .await + .unwrap(); // All projects are not ready, so project 1 is on top (has the oldest envelopes): assert_eq!( buffer.peek().await.unwrap().unwrap().meta().public_key(), @@ -499,14 +545,14 @@ mod tests { } #[tokio::test] - async fn project_internal_order() { + async fn test_project_internal_order() { let mut buffer = EnvelopeBuffer::::new(); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); - let envelope1 = new_envelope(project_key, None); + let envelope1 = new_envelope(project_key, None, None); let instant1 = envelope1.meta().start_time(); - let envelope2 = new_envelope(project_key, None); + let envelope2 = new_envelope(project_key, None, None); let instant2 = envelope2.meta().start_time(); assert!(instant2 > instant1); @@ -526,21 +572,21 @@ mod tests { } #[tokio::test] - async fn sampling_projects() { + async fn test_sampling_projects() { let mut buffer = EnvelopeBuffer::::new(); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); - let envelope1 = new_envelope(project_key1, None); + let envelope1 = new_envelope(project_key1, None, None); let instant1 = envelope1.meta().start_time(); buffer.push(envelope1).await.unwrap(); - let envelope2 = new_envelope(project_key2, None); + let envelope2 = new_envelope(project_key2, None, None); let instant2 = envelope2.meta().start_time(); buffer.push(envelope2).await.unwrap(); - let envelope3 = new_envelope(project_key1, Some(project_key2)); + let envelope3 = new_envelope(project_key1, Some(project_key2), None); let instant3 = envelope3.meta().start_time(); buffer.push(envelope3).await.unwrap(); @@ -594,4 +640,34 @@ mod tests { assert!(buffer.pop().await.unwrap().is_none()); } + + #[tokio::test] + async fn test_last_peek_internal_order() { + let mut buffer = EnvelopeBuffer::::new(); + + let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let event_id_1 = EventId::new(); + let envelope1 = new_envelope(project_key_1, None, Some(event_id_1)); + + let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let event_id_2 = EventId::new(); + let mut envelope2 = new_envelope(project_key_2, None, Some(event_id_2)); + envelope2.set_start_time(envelope1.meta().start_time()); + + buffer.push(envelope1).await.unwrap(); + buffer.push(envelope2).await.unwrap(); + + assert_eq!( + buffer.peek().await.unwrap().unwrap().event_id().unwrap(), + event_id_1 + ); + assert_eq!( + buffer.peek().await.unwrap().unwrap().event_id().unwrap(), + event_id_2 + ); + assert_eq!( + buffer.peek().await.unwrap().unwrap().event_id().unwrap(), + event_id_1 + ); + } } From 9a829c1ad581538f884e137750882e1eea78e50b Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 13 Aug 2024 12:36:35 +0200 Subject: [PATCH 2/2] Add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b0292d3c5b..90a800b706 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ - Extract `user.geo.country_code` into span indexed. ([#3911](https://github.com/getsentry/relay/pull/3911)) - Add `span.system` tag to span metrics ([#3913](https://github.com/getsentry/relay/pull/3913)) - Extract client sdk from transaction into profiles. ([#3915](https://github.com/getsentry/relay/pull/3915)) +- Add `last_peek` field to the `Priority` struct. ([#3922](https://github.com/getsentry/relay/pull/3922)) ## 24.7.1