diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fe90f3a41..abb9d0f07f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ - Switch glob implementations from `regex` to `regex-lite`. ([#3926](https://github.com/getsentry/relay/pull/3926)) - Extract client sdk from transaction into profiles. ([#3915](https://github.com/getsentry/relay/pull/3915)) - Extract `user.geo.subregion` into span metrics/indexed. ([#3914](https://github.com/getsentry/relay/pull/3914)) +- Add `last_peek` field to the `Priority` struct. ([#3922](https://github.com/getsentry/relay/pull/3922)) ## 24.7.1 diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index ebe30bc76f..99e50a09ba 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -162,13 +162,30 @@ where /// Returns a reference to the next-in-line envelope, if one exists. pub async fn peek(&mut self) -> Result, EnvelopeBufferError> { + let Some(( + QueueItem { + key: stack_key, + value: _, + }, + _, + )) = self.priority_queue.peek() + else { + return Ok(None); + }; + + let stack_key = *stack_key; + + self.priority_queue.change_priority_by(&stack_key, |prio| { + prio.last_peek = Instant::now(); + }); + let Some(( QueueItem { key: _, value: stack, }, _, - )) = self.priority_queue.peek_mut() + )) = self.priority_queue.get_mut(&stack_key) else { return Ok(None); }; @@ -329,6 +346,7 @@ impl Eq for QueueItem {} struct Priority { readiness: Readiness, received_at: Instant, + last_peek: Instant, } impl Priority { @@ -336,13 +354,16 @@ impl Priority { Self { readiness: Readiness::new(), received_at, + last_peek: Instant::now(), } } } impl PartialEq for Priority { fn eq(&self, other: &Self) -> bool { - self.readiness.ready() == other.readiness.ready() && self.received_at == other.received_at + self.readiness.ready() == other.readiness.ready() + && self.received_at == other.received_at + && self.last_peek == other.last_peek } } @@ -357,12 +378,22 @@ impl Eq for Priority {} impl Ord for Priority { fn cmp(&self, other: &Self) -> Ordering { match (self.readiness.ready(), other.readiness.ready()) { - (true, true) => self.received_at.cmp(&other.received_at), + // Assuming that two priorities differ only w.r.t. the `last_peek`, we want to prioritize + // stacks that were the least recently peeked. The rationale behind this is that we want + // to keep cycling through different stacks while peeking. + (true, true) => self + .received_at + .cmp(&other.received_at) + .then(self.last_peek.cmp(&other.last_peek).reverse()), (true, false) => Ordering::Greater, (false, true) => Ordering::Less, // For non-ready stacks, we invert the priority, such that projects that are not // ready and did not receive envelopes recently can be evicted. - (false, false) => self.received_at.cmp(&other.received_at).reverse(), + (false, false) => self + .received_at + .cmp(&other.received_at) + .reverse() + .then(self.last_peek.cmp(&other.last_peek).reverse()), } } } @@ -389,10 +420,10 @@ impl Readiness { #[cfg(test)] mod tests { use std::str::FromStr; - use uuid::Uuid; use relay_common::Dsn; + use relay_event_schema::protocol::EventId; use relay_sampling::DynamicSamplingContext; use crate::envelope::{Item, ItemType}; @@ -400,7 +431,11 @@ mod tests { use super::*; - fn new_envelope(project_key: ProjectKey, sampling_key: Option) -> Box { + fn new_envelope( + project_key: ProjectKey, + sampling_key: Option, + event_id: Option, + ) -> Box { let mut envelope = Envelope::from_request( None, RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()), @@ -420,11 +455,14 @@ mod tests { }); envelope.add_item(Item::new(ItemType::Transaction)); } + if let Some(event_id) = event_id { + envelope.set_event_id(event_id); + } envelope } #[tokio::test] - async fn insert_pop() { + async fn test_insert_pop() { let mut buffer = EnvelopeBuffer::::new(); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); @@ -434,20 +472,29 @@ mod tests { assert!(buffer.pop().await.unwrap().is_none()); assert!(buffer.peek().await.unwrap().is_none()); - buffer.push(new_envelope(project_key1, None)).await.unwrap(); + buffer + .push(new_envelope(project_key1, None, None)) + .await + .unwrap(); assert_eq!( buffer.peek().await.unwrap().unwrap().meta().public_key(), project_key1 ); - buffer.push(new_envelope(project_key2, None)).await.unwrap(); + buffer + .push(new_envelope(project_key2, None, None)) + .await + .unwrap(); // Both projects are not ready, so project 1 is on top (has the oldest envelopes): assert_eq!( buffer.peek().await.unwrap().unwrap().meta().public_key(), project_key1 ); - buffer.push(new_envelope(project_key3, None)).await.unwrap(); + buffer + .push(new_envelope(project_key3, None, None)) + .await + .unwrap(); // All projects are not ready, so project 1 is on top (has the oldest envelopes): assert_eq!( buffer.peek().await.unwrap().unwrap().meta().public_key(), @@ -499,14 +546,14 @@ mod tests { } #[tokio::test] - async fn project_internal_order() { + async fn test_project_internal_order() { let mut buffer = EnvelopeBuffer::::new(); let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); - let envelope1 = new_envelope(project_key, None); + let envelope1 = new_envelope(project_key, None, None); let instant1 = envelope1.meta().start_time(); - let envelope2 = new_envelope(project_key, None); + let envelope2 = new_envelope(project_key, None, None); let instant2 = envelope2.meta().start_time(); assert!(instant2 > instant1); @@ -526,21 +573,21 @@ mod tests { } #[tokio::test] - async fn sampling_projects() { + async fn test_sampling_projects() { let mut buffer = EnvelopeBuffer::::new(); let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); - let envelope1 = new_envelope(project_key1, None); + let envelope1 = new_envelope(project_key1, None, None); let instant1 = envelope1.meta().start_time(); buffer.push(envelope1).await.unwrap(); - let envelope2 = new_envelope(project_key2, None); + let envelope2 = new_envelope(project_key2, None, None); let instant2 = envelope2.meta().start_time(); buffer.push(envelope2).await.unwrap(); - let envelope3 = new_envelope(project_key1, Some(project_key2)); + let envelope3 = new_envelope(project_key1, Some(project_key2), None); let instant3 = envelope3.meta().start_time(); buffer.push(envelope3).await.unwrap(); @@ -596,7 +643,7 @@ mod tests { } #[tokio::test] - async fn project_keys_distinct() { + async fn test_project_keys_distinct() { let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); @@ -607,13 +654,43 @@ mod tests { let mut buffer = EnvelopeBuffer::::new(); buffer - .push(new_envelope(project_key1, Some(project_key2))) + .push(new_envelope(project_key1, Some(project_key2), None)) .await .unwrap(); buffer - .push(new_envelope(project_key2, Some(project_key1))) + .push(new_envelope(project_key2, Some(project_key1), None)) .await .unwrap(); assert_eq!(buffer.priority_queue.len(), 2); } + + #[tokio::test] + async fn test_last_peek_internal_order() { + let mut buffer = EnvelopeBuffer::::new(); + + let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let event_id_1 = EventId::new(); + let envelope1 = new_envelope(project_key_1, None, Some(event_id_1)); + + let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let event_id_2 = EventId::new(); + let mut envelope2 = new_envelope(project_key_2, None, Some(event_id_2)); + envelope2.set_start_time(envelope1.meta().start_time()); + + buffer.push(envelope1).await.unwrap(); + buffer.push(envelope2).await.unwrap(); + + assert_eq!( + buffer.peek().await.unwrap().unwrap().event_id().unwrap(), + event_id_1 + ); + assert_eq!( + buffer.peek().await.unwrap().unwrap().event_id().unwrap(), + event_id_2 + ); + assert_eq!( + buffer.peek().await.unwrap().unwrap().event_id().unwrap(), + event_id_1 + ); + } }