Skip to content

Commit

Permalink
fix(buffer): Cycle through unready projects (#3968)
Browse files Browse the repository at this point in the history
Restore `last_peek` behavior with an explicit function call. This
prevents reprioritization between `peek` and `pop`.
  • Loading branch information
jjbayer authored Aug 30, 2024
1 parent 7360fe7 commit 79e7605
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 62 deletions.
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
- Add `EnvelopeStore` trait and implement `DiskUsage` for tracking disk usage. ([#3925](https://github.com/getsentry/relay/pull/3925))
- Increase replay recording limit to two hours. ([#3961](https://github.com/getsentry/relay/pull/3961))
- Make EnvelopeBuffer a Service. ([#3965](https://github.com/getsentry/relay/pull/3965))

**Internal**:

- No longer send COGS data to dedicated Kafka topic. ([#3953](https://github.com/getsentry/relay/pull/3953))

## 24.8.0
Expand Down
100 changes: 60 additions & 40 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ impl PolymorphicEnvelopeBuffer {
}
}

/// Marks a stack as seen.
///
/// Non-ready stacks are deprioritized when they are marked as seen, such that
/// the next call to `.peek()` will look at a different stack. This prevents
/// head-of-line blocking.
pub fn mark_seen(&mut self, stack_key: &StackKey) {
match self {
Self::Sqlite(buffer) => buffer.mark_seen(stack_key),
Self::InMemory(buffer) => buffer.mark_seen(stack_key),
}
}

/// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s.
pub fn has_capacity(&self) -> bool {
match self {
Expand Down Expand Up @@ -179,7 +191,7 @@ where
pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
let Some((
QueueItem {
key: _,
key: stack_key,
value: stack,
},
Priority { readiness, .. },
Expand All @@ -193,7 +205,7 @@ where
Ok(match (stack.peek().await?, ready) {
(None, _) => Peek::Empty,
(Some(envelope), true) => Peek::Ready(envelope),
(Some(envelope), false) => Peek::NotReady(envelope),
(Some(envelope), false) => Peek::NotReady(*stack_key, envelope),
})
}

Expand Down Expand Up @@ -256,6 +268,17 @@ where
changed
}

/// Marks a stack as seen.
///
/// Non-ready stacks are deprioritized when they are marked as seen, such that
/// the next call to `.peek()` will look at a different stack. This prevents
/// head-of-line blocking.
pub fn mark_seen(&mut self, stack_key: &StackKey) {
self.priority_queue.change_priority_by(stack_key, |stack| {
stack.last_peek = Instant::now();
});
}

fn push_stack(&mut self, envelope: Box<Envelope>) {
let received_at = envelope.meta().start_time();
let stack_key = StackKey::from_envelope(&envelope);
Expand Down Expand Up @@ -302,7 +325,7 @@ where
pub enum Peek<'a> {
Empty,
Ready(&'a Envelope),
NotReady(&'a Envelope),
NotReady(StackKey, &'a Envelope),
}

#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
Expand Down Expand Up @@ -457,7 +480,7 @@ mod tests {
fn envelope(&self) -> Option<&Envelope> {
match self {
Peek::Empty => None,
Peek::Ready(envelope) | Peek::NotReady(envelope) => Some(envelope),
Peek::Ready(envelope) | Peek::NotReady(_, envelope) => Some(envelope),
}
}
}
Expand Down Expand Up @@ -793,7 +816,6 @@ mod tests {
assert_eq!(buffer.priority_queue.len(), 2);
}

#[ignore = "last_peek disabled for now"]
#[tokio::test]
async fn test_last_peek_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(mock_memory_checker());
Expand All @@ -804,44 +826,42 @@ mod tests {

let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
let event_id_2 = EventId::new();
let mut envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
envelope2.set_start_time(envelope1.meta().start_time());
let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));

buffer.push(envelope1).await.unwrap();
buffer.push(envelope2).await.unwrap();

assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.event_id()
.unwrap(),
event_id_1
);
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.event_id()
.unwrap(),
event_id_2
);
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.event_id()
.unwrap(),
event_id_1
);
// event_id_1 is first element:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));

// Second peek returns same element:
let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));

buffer.mark_seen(&stack_key);

// After mark_seen, event 2 is on top:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_2));

let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_2));

buffer.mark_seen(&stack_key);

// After another mark_seen, cycle back to event 1:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));
}
}
34 changes: 15 additions & 19 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Types for buffering envelopes.

use std::future;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
Expand Down Expand Up @@ -83,9 +83,11 @@ pub struct EnvelopeBufferService {
buffer: PolymorphicEnvelopeBuffer,
project_cache: Addr<ProjectCache>,
has_capacity: Arc<AtomicBool>,
changes: bool,
sleep: Duration,
}

const DEFAULT_SLEEP: Duration = Duration::from_millis(100);

impl EnvelopeBufferService {
/// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config.
///
Expand All @@ -100,7 +102,7 @@ impl EnvelopeBufferService {
buffer: PolymorphicEnvelopeBuffer::from_config(config, memory_checker),
project_cache,
has_capacity: Arc::new(AtomicBool::new(true)),
changes: true,
sleep: Duration::ZERO,
})
}

Expand All @@ -113,23 +115,15 @@ impl EnvelopeBufferService {
}
}

/// Return immediately if changes were flagged, otherwise sleep forever.
///
/// NOTE: This function sleeps indefinitely if no changes were flagged.
/// Only use in combination with [`tokio::select!`].
async fn wait_for_changes(&mut self) {
if !self.changes {
let _: () = future::pending().await; // wait until cancelled
}
}

/// Tries to pop an envelope for a ready project.
///
/// Returns the amount of time we should wait until next pop
async fn try_pop(&mut self) -> Result<(), EnvelopeBufferError> {
relay_log::trace!("EnvelopeBufferService peek");
match self.buffer.peek().await? {
Peek::Empty => {
relay_log::trace!("EnvelopeBufferService empty");
self.changes = false;
self.sleep = Duration::MAX; // wait for reset by `handle_message`.
}
Peek::Ready(_) => {
relay_log::trace!("EnvelopeBufferService pop");
Expand All @@ -139,9 +133,9 @@ impl EnvelopeBufferService {
.await?
.expect("Element disappeared despite exclusive excess");
self.project_cache.send(DequeuedEnvelope(envelope));
self.changes = true;
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(envelope) => {
Peek::NotReady(stack_key, envelope) => {
relay_log::trace!("EnvelopeBufferService request update");
let project_key = envelope.meta().public_key();
self.project_cache.send(UpdateProject(project_key));
Expand All @@ -152,7 +146,9 @@ impl EnvelopeBufferService {
self.project_cache.send(UpdateProject(sampling_key));
}
}
self.changes = false;
// deprioritize the stack to prevent head-of-line blocking
self.buffer.mark_seen(&stack_key);
self.sleep = DEFAULT_SLEEP;
}
}
Ok(())
Expand All @@ -179,7 +175,7 @@ impl EnvelopeBufferService {
self.buffer.mark_ready(&project_key, true);
}
};
self.changes = true;
self.sleep = Duration::ZERO;
}

async fn push(&mut self, envelope: Box<Envelope>) {
Expand Down Expand Up @@ -210,7 +206,7 @@ impl Service for EnvelopeBufferService {
// On the one hand, we might want to prioritize dequeing over enqueing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
() = self.wait_for_changes() => {
() = tokio::time::sleep(self.sleep) => {
if let Err(e) = self.try_pop().await {
relay_log::error!(
error = &e as &dyn std::error::Error,
Expand Down

0 comments on commit 79e7605

Please sign in to comment.