diff --git a/CHANGELOG.md b/CHANGELOG.md index fa2b4d2145..a646a08ee5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,9 +19,6 @@ - Add `EnvelopeStore` trait and implement `DiskUsage` for tracking disk usage. ([#3925](https://github.com/getsentry/relay/pull/3925)) - Increase replay recording limit to two hours. ([#3961](https://github.com/getsentry/relay/pull/3961)) - Make EnvelopeBuffer a Service. ([#3965](https://github.com/getsentry/relay/pull/3965)) - -**Internal**: - - No longer send COGS data to dedicated Kafka topic. ([#3953](https://github.com/getsentry/relay/pull/3953)) ## 24.8.0 diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index b7ac387e40..da45769b9d 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -83,6 +83,18 @@ impl PolymorphicEnvelopeBuffer { } } + /// Marks a stack as seen. + /// + /// Non-ready stacks are deprioritized when they are marked as seen, such that + /// the next call to `.peek()` will look at a different stack. This prevents + /// head-of-line blocking. + pub fn mark_seen(&mut self, stack_key: &StackKey) { + match self { + Self::Sqlite(buffer) => buffer.mark_seen(stack_key), + Self::InMemory(buffer) => buffer.mark_seen(stack_key), + } + } + /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s. pub fn has_capacity(&self) -> bool { match self { @@ -179,7 +191,7 @@ where pub async fn peek(&mut self) -> Result { let Some(( QueueItem { - key: _, + key: stack_key, value: stack, }, Priority { readiness, .. }, @@ -193,7 +205,7 @@ where Ok(match (stack.peek().await?, ready) { (None, _) => Peek::Empty, (Some(envelope), true) => Peek::Ready(envelope), - (Some(envelope), false) => Peek::NotReady(envelope), + (Some(envelope), false) => Peek::NotReady(*stack_key, envelope), }) } @@ -256,6 +268,17 @@ where changed } + /// Marks a stack as seen. + /// + /// Non-ready stacks are deprioritized when they are marked as seen, such that + /// the next call to `.peek()` will look at a different stack. This prevents + /// head-of-line blocking. + pub fn mark_seen(&mut self, stack_key: &StackKey) { + self.priority_queue.change_priority_by(stack_key, |stack| { + stack.last_peek = Instant::now(); + }); + } + fn push_stack(&mut self, envelope: Box) { let received_at = envelope.meta().start_time(); let stack_key = StackKey::from_envelope(&envelope); @@ -302,7 +325,7 @@ where pub enum Peek<'a> { Empty, Ready(&'a Envelope), - NotReady(&'a Envelope), + NotReady(StackKey, &'a Envelope), } #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] @@ -457,7 +480,7 @@ mod tests { fn envelope(&self) -> Option<&Envelope> { match self { Peek::Empty => None, - Peek::Ready(envelope) | Peek::NotReady(envelope) => Some(envelope), + Peek::Ready(envelope) | Peek::NotReady(_, envelope) => Some(envelope), } } } @@ -793,7 +816,6 @@ mod tests { assert_eq!(buffer.priority_queue.len(), 2); } - #[ignore = "last_peek disabled for now"] #[tokio::test] async fn test_last_peek_internal_order() { let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); @@ -804,44 +826,42 @@ mod tests { let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap(); let event_id_2 = EventId::new(); - let mut envelope2 = new_envelope(project_key_2, None, Some(event_id_2)); - envelope2.set_start_time(envelope1.meta().start_time()); + let envelope2 = new_envelope(project_key_2, None, Some(event_id_2)); buffer.push(envelope1).await.unwrap(); buffer.push(envelope2).await.unwrap(); - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .event_id() - .unwrap(), - event_id_1 - ); - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .event_id() - .unwrap(), - event_id_2 - ); - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .event_id() - .unwrap(), - event_id_1 - ); + // event_id_1 is first element: + let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_1)); + + // Second peek returns same element: + let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_1)); + + buffer.mark_seen(&stack_key); + + // After mark_seen, event 2 is on top: + let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_2)); + + let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_2)); + + buffer.mark_seen(&stack_key); + + // After another mark_seen, cycle back to event 1: + let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_1)); } } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index fca8d61e5f..02dcb68cfc 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -1,9 +1,9 @@ //! Types for buffering envelopes. -use std::future; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; @@ -83,9 +83,11 @@ pub struct EnvelopeBufferService { buffer: PolymorphicEnvelopeBuffer, project_cache: Addr, has_capacity: Arc, - changes: bool, + sleep: Duration, } +const DEFAULT_SLEEP: Duration = Duration::from_millis(100); + impl EnvelopeBufferService { /// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config. /// @@ -100,7 +102,7 @@ impl EnvelopeBufferService { buffer: PolymorphicEnvelopeBuffer::from_config(config, memory_checker), project_cache, has_capacity: Arc::new(AtomicBool::new(true)), - changes: true, + sleep: Duration::ZERO, }) } @@ -113,23 +115,15 @@ impl EnvelopeBufferService { } } - /// Return immediately if changes were flagged, otherwise sleep forever. - /// - /// NOTE: This function sleeps indefinitely if no changes were flagged. - /// Only use in combination with [`tokio::select!`]. - async fn wait_for_changes(&mut self) { - if !self.changes { - let _: () = future::pending().await; // wait until cancelled - } - } - /// Tries to pop an envelope for a ready project. + /// + /// Returns the amount of time we should wait until next pop async fn try_pop(&mut self) -> Result<(), EnvelopeBufferError> { relay_log::trace!("EnvelopeBufferService peek"); match self.buffer.peek().await? { Peek::Empty => { relay_log::trace!("EnvelopeBufferService empty"); - self.changes = false; + self.sleep = Duration::MAX; // wait for reset by `handle_message`. } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService pop"); @@ -139,9 +133,9 @@ impl EnvelopeBufferService { .await? .expect("Element disappeared despite exclusive excess"); self.project_cache.send(DequeuedEnvelope(envelope)); - self.changes = true; + self.sleep = Duration::ZERO; // try next pop immediately } - Peek::NotReady(envelope) => { + Peek::NotReady(stack_key, envelope) => { relay_log::trace!("EnvelopeBufferService request update"); let project_key = envelope.meta().public_key(); self.project_cache.send(UpdateProject(project_key)); @@ -152,7 +146,9 @@ impl EnvelopeBufferService { self.project_cache.send(UpdateProject(sampling_key)); } } - self.changes = false; + // deprioritize the stack to prevent head-of-line blocking + self.buffer.mark_seen(&stack_key); + self.sleep = DEFAULT_SLEEP; } } Ok(()) @@ -179,7 +175,7 @@ impl EnvelopeBufferService { self.buffer.mark_ready(&project_key, true); } }; - self.changes = true; + self.sleep = Duration::ZERO; } async fn push(&mut self, envelope: Box) { @@ -210,7 +206,7 @@ impl Service for EnvelopeBufferService { // On the one hand, we might want to prioritize dequeing over enqueing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - () = self.wait_for_changes() => { + () = tokio::time::sleep(self.sleep) => { if let Err(e) = self.try_pop().await { relay_log::error!( error = &e as &dyn std::error::Error,