From 3c5e16cd5156d749c704fe60bbcd0f10ce0433a7 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 30 Aug 2024 08:25:30 +0200 Subject: [PATCH 1/3] fix(buffer): Cycle through unready projects --- .../services/buffer/envelope_buffer/mod.rs | 100 +++++++++++------- relay-server/src/services/buffer/mod.rs | 4 +- 2 files changed, 63 insertions(+), 41 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index b7ac387e40..da45769b9d 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -83,6 +83,18 @@ impl PolymorphicEnvelopeBuffer { } } + /// Marks a stack as seen. + /// + /// Non-ready stacks are deprioritized when they are marked as seen, such that + /// the next call to `.peek()` will look at a different stack. This prevents + /// head-of-line blocking. + pub fn mark_seen(&mut self, stack_key: &StackKey) { + match self { + Self::Sqlite(buffer) => buffer.mark_seen(stack_key), + Self::InMemory(buffer) => buffer.mark_seen(stack_key), + } + } + /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s. pub fn has_capacity(&self) -> bool { match self { @@ -179,7 +191,7 @@ where pub async fn peek(&mut self) -> Result { let Some(( QueueItem { - key: _, + key: stack_key, value: stack, }, Priority { readiness, .. }, @@ -193,7 +205,7 @@ where Ok(match (stack.peek().await?, ready) { (None, _) => Peek::Empty, (Some(envelope), true) => Peek::Ready(envelope), - (Some(envelope), false) => Peek::NotReady(envelope), + (Some(envelope), false) => Peek::NotReady(*stack_key, envelope), }) } @@ -256,6 +268,17 @@ where changed } + /// Marks a stack as seen. + /// + /// Non-ready stacks are deprioritized when they are marked as seen, such that + /// the next call to `.peek()` will look at a different stack. This prevents + /// head-of-line blocking. + pub fn mark_seen(&mut self, stack_key: &StackKey) { + self.priority_queue.change_priority_by(stack_key, |stack| { + stack.last_peek = Instant::now(); + }); + } + fn push_stack(&mut self, envelope: Box) { let received_at = envelope.meta().start_time(); let stack_key = StackKey::from_envelope(&envelope); @@ -302,7 +325,7 @@ where pub enum Peek<'a> { Empty, Ready(&'a Envelope), - NotReady(&'a Envelope), + NotReady(StackKey, &'a Envelope), } #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] @@ -457,7 +480,7 @@ mod tests { fn envelope(&self) -> Option<&Envelope> { match self { Peek::Empty => None, - Peek::Ready(envelope) | Peek::NotReady(envelope) => Some(envelope), + Peek::Ready(envelope) | Peek::NotReady(_, envelope) => Some(envelope), } } } @@ -793,7 +816,6 @@ mod tests { assert_eq!(buffer.priority_queue.len(), 2); } - #[ignore = "last_peek disabled for now"] #[tokio::test] async fn test_last_peek_internal_order() { let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); @@ -804,44 +826,42 @@ mod tests { let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap(); let event_id_2 = EventId::new(); - let mut envelope2 = new_envelope(project_key_2, None, Some(event_id_2)); - envelope2.set_start_time(envelope1.meta().start_time()); + let envelope2 = new_envelope(project_key_2, None, Some(event_id_2)); buffer.push(envelope1).await.unwrap(); buffer.push(envelope2).await.unwrap(); - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .event_id() - .unwrap(), - event_id_1 - ); - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .event_id() - .unwrap(), - event_id_2 - ); - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .event_id() - .unwrap(), - event_id_1 - ); + // event_id_1 is first element: + let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_1)); + + // Second peek returns same element: + let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_1)); + + buffer.mark_seen(&stack_key); + + // After mark_seen, event 2 is on top: + let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_2)); + + let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_2)); + + buffer.mark_seen(&stack_key); + + // After another mark_seen, cycle back to event 1: + let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_1)); } } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index fca8d61e5f..bacb35f948 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -141,7 +141,7 @@ impl EnvelopeBufferService { self.project_cache.send(DequeuedEnvelope(envelope)); self.changes = true; } - Peek::NotReady(envelope) => { + Peek::NotReady(stack_key, envelope) => { relay_log::trace!("EnvelopeBufferService request update"); let project_key = envelope.meta().public_key(); self.project_cache.send(UpdateProject(project_key)); @@ -152,6 +152,8 @@ impl EnvelopeBufferService { self.project_cache.send(UpdateProject(sampling_key)); } } + // deprioritize the stack to prevent head-of-line blocking + self.buffer.mark_seen(&stack_key); self.changes = false; } } From fa0eae6959a6383904e48ae720f88df9e3d8b857 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 30 Aug 2024 08:52:39 +0200 Subject: [PATCH 2/3] sleep --- CHANGELOG.md | 3 -- relay-server/src/services/buffer/mod.rs | 37 ++++++++++--------------- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa2b4d2145..a646a08ee5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,9 +19,6 @@ - Add `EnvelopeStore` trait and implement `DiskUsage` for tracking disk usage. ([#3925](https://github.com/getsentry/relay/pull/3925)) - Increase replay recording limit to two hours. ([#3961](https://github.com/getsentry/relay/pull/3961)) - Make EnvelopeBuffer a Service. ([#3965](https://github.com/getsentry/relay/pull/3965)) - -**Internal**: - - No longer send COGS data to dedicated Kafka topic. ([#3953](https://github.com/getsentry/relay/pull/3953)) ## 24.8.0 diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index bacb35f948..2d9075945e 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -1,9 +1,9 @@ //! Types for buffering envelopes. -use std::future; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; @@ -83,9 +83,10 @@ pub struct EnvelopeBufferService { buffer: PolymorphicEnvelopeBuffer, project_cache: Addr, has_capacity: Arc, - changes: bool, } +const DEFAULT_SLEEP: Duration = Duration::from_millis(100); + impl EnvelopeBufferService { /// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config. /// @@ -100,7 +101,6 @@ impl EnvelopeBufferService { buffer: PolymorphicEnvelopeBuffer::from_config(config, memory_checker), project_cache, has_capacity: Arc::new(AtomicBool::new(true)), - changes: true, }) } @@ -113,23 +113,16 @@ impl EnvelopeBufferService { } } - /// Return immediately if changes were flagged, otherwise sleep forever. - /// - /// NOTE: This function sleeps indefinitely if no changes were flagged. - /// Only use in combination with [`tokio::select!`]. - async fn wait_for_changes(&mut self) { - if !self.changes { - let _: () = future::pending().await; // wait until cancelled - } - } - /// Tries to pop an envelope for a ready project. - async fn try_pop(&mut self) -> Result<(), EnvelopeBufferError> { + /// + /// Returns the amount of time we should wait until next pop + async fn try_pop(&mut self) -> Result { relay_log::trace!("EnvelopeBufferService peek"); + let next_sleep; match self.buffer.peek().await? { Peek::Empty => { relay_log::trace!("EnvelopeBufferService empty"); - self.changes = false; + next_sleep = DEFAULT_SLEEP } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService pop"); @@ -139,7 +132,7 @@ impl EnvelopeBufferService { .await? .expect("Element disappeared despite exclusive excess"); self.project_cache.send(DequeuedEnvelope(envelope)); - self.changes = true; + next_sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, envelope) => { relay_log::trace!("EnvelopeBufferService request update"); @@ -154,10 +147,10 @@ impl EnvelopeBufferService { } // deprioritize the stack to prevent head-of-line blocking self.buffer.mark_seen(&stack_key); - self.changes = false; + next_sleep = DEFAULT_SLEEP; } } - Ok(()) + Ok(next_sleep) } async fn handle_message(&mut self, message: EnvelopeBuffer) { @@ -181,7 +174,6 @@ impl EnvelopeBufferService { self.buffer.mark_ready(&project_key, true); } }; - self.changes = true; } async fn push(&mut self, envelope: Box) { @@ -205,6 +197,7 @@ impl Service for EnvelopeBufferService { fn spawn_handler(mut self, mut rx: Receiver) { tokio::spawn(async move { relay_log::info!("EnvelopeBufferService start"); + let mut sleep = Duration::ZERO; loop { relay_log::trace!("EnvelopeBufferService loop"); tokio::select! { @@ -212,13 +205,13 @@ impl Service for EnvelopeBufferService { // On the one hand, we might want to prioritize dequeing over enqueing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - () = self.wait_for_changes() => { - if let Err(e) = self.try_pop().await { + () = tokio::time::sleep(sleep) => { + sleep = self.try_pop().await.map_err(|e| { relay_log::error!( error = &e as &dyn std::error::Error, "failed to pop envelope" ); - } + }).unwrap_or(DEFAULT_SLEEP); } Some(message) = rx.recv() => { self.handle_message(message).await; From b3035bc8b8f2827d80096009ab7df9a2f8b5675d Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 30 Aug 2024 10:52:41 +0200 Subject: [PATCH 3/3] fix: Reset sleep from handle_message --- relay-server/src/services/buffer/mod.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 2d9075945e..02dcb68cfc 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -83,6 +83,7 @@ pub struct EnvelopeBufferService { buffer: PolymorphicEnvelopeBuffer, project_cache: Addr, has_capacity: Arc, + sleep: Duration, } const DEFAULT_SLEEP: Duration = Duration::from_millis(100); @@ -101,6 +102,7 @@ impl EnvelopeBufferService { buffer: PolymorphicEnvelopeBuffer::from_config(config, memory_checker), project_cache, has_capacity: Arc::new(AtomicBool::new(true)), + sleep: Duration::ZERO, }) } @@ -116,13 +118,12 @@ impl EnvelopeBufferService { /// Tries to pop an envelope for a ready project. /// /// Returns the amount of time we should wait until next pop - async fn try_pop(&mut self) -> Result { + async fn try_pop(&mut self) -> Result<(), EnvelopeBufferError> { relay_log::trace!("EnvelopeBufferService peek"); - let next_sleep; match self.buffer.peek().await? { Peek::Empty => { relay_log::trace!("EnvelopeBufferService empty"); - next_sleep = DEFAULT_SLEEP + self.sleep = Duration::MAX; // wait for reset by `handle_message`. } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService pop"); @@ -132,7 +133,7 @@ impl EnvelopeBufferService { .await? .expect("Element disappeared despite exclusive excess"); self.project_cache.send(DequeuedEnvelope(envelope)); - next_sleep = Duration::ZERO; // try next pop immediately + self.sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, envelope) => { relay_log::trace!("EnvelopeBufferService request update"); @@ -147,10 +148,10 @@ impl EnvelopeBufferService { } // deprioritize the stack to prevent head-of-line blocking self.buffer.mark_seen(&stack_key); - next_sleep = DEFAULT_SLEEP; + self.sleep = DEFAULT_SLEEP; } } - Ok(next_sleep) + Ok(()) } async fn handle_message(&mut self, message: EnvelopeBuffer) { @@ -174,6 +175,7 @@ impl EnvelopeBufferService { self.buffer.mark_ready(&project_key, true); } }; + self.sleep = Duration::ZERO; } async fn push(&mut self, envelope: Box) { @@ -197,7 +199,6 @@ impl Service for EnvelopeBufferService { fn spawn_handler(mut self, mut rx: Receiver) { tokio::spawn(async move { relay_log::info!("EnvelopeBufferService start"); - let mut sleep = Duration::ZERO; loop { relay_log::trace!("EnvelopeBufferService loop"); tokio::select! { @@ -205,13 +206,13 @@ impl Service for EnvelopeBufferService { // On the one hand, we might want to prioritize dequeing over enqueing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - () = tokio::time::sleep(sleep) => { - sleep = self.try_pop().await.map_err(|e| { + () = tokio::time::sleep(self.sleep) => { + if let Err(e) = self.try_pop().await { relay_log::error!( error = &e as &dyn std::error::Error, "failed to pop envelope" ); - }).unwrap_or(DEFAULT_SLEEP); + } } Some(message) = rx.recv() => { self.handle_message(message).await;