Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(buffer): Cycle through unready projects #3968

Merged
merged 3 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
- Add `EnvelopeStore` trait and implement `DiskUsage` for tracking disk usage. ([#3925](https://github.com/getsentry/relay/pull/3925))
- Increase replay recording limit to two hours. ([#3961](https://github.com/getsentry/relay/pull/3961))
- Make EnvelopeBuffer a Service. ([#3965](https://github.com/getsentry/relay/pull/3965))

**Internal**:

- No longer send COGS data to dedicated Kafka topic. ([#3953](https://github.com/getsentry/relay/pull/3953))

## 24.8.0
Expand Down
100 changes: 60 additions & 40 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ impl PolymorphicEnvelopeBuffer {
}
}

/// Marks a stack as seen.
///
/// Non-ready stacks are deprioritized when they are marked as seen, such that
/// the next call to `.peek()` will look at a different stack. This prevents
/// head-of-line blocking.
pub fn mark_seen(&mut self, stack_key: &StackKey) {
match self {
Self::Sqlite(buffer) => buffer.mark_seen(stack_key),
Self::InMemory(buffer) => buffer.mark_seen(stack_key),
}
}

/// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s.
pub fn has_capacity(&self) -> bool {
match self {
Expand Down Expand Up @@ -179,7 +191,7 @@ where
pub async fn peek(&mut self) -> Result<Peek, EnvelopeBufferError> {
let Some((
QueueItem {
key: _,
key: stack_key,
value: stack,
},
Priority { readiness, .. },
Expand All @@ -193,7 +205,7 @@ where
Ok(match (stack.peek().await?, ready) {
(None, _) => Peek::Empty,
(Some(envelope), true) => Peek::Ready(envelope),
(Some(envelope), false) => Peek::NotReady(envelope),
(Some(envelope), false) => Peek::NotReady(*stack_key, envelope),
})
}

Expand Down Expand Up @@ -256,6 +268,17 @@ where
changed
}

/// Marks a stack as seen.
///
/// Non-ready stacks are deprioritized when they are marked as seen, such that
/// the next call to `.peek()` will look at a different stack. This prevents
/// head-of-line blocking.
pub fn mark_seen(&mut self, stack_key: &StackKey) {
self.priority_queue.change_priority_by(stack_key, |stack| {
stack.last_peek = Instant::now();
});
}

fn push_stack(&mut self, envelope: Box<Envelope>) {
let received_at = envelope.meta().start_time();
let stack_key = StackKey::from_envelope(&envelope);
Expand Down Expand Up @@ -302,7 +325,7 @@ where
pub enum Peek<'a> {
Empty,
Ready(&'a Envelope),
NotReady(&'a Envelope),
NotReady(StackKey, &'a Envelope),
}

#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
Expand Down Expand Up @@ -457,7 +480,7 @@ mod tests {
fn envelope(&self) -> Option<&Envelope> {
match self {
Peek::Empty => None,
Peek::Ready(envelope) | Peek::NotReady(envelope) => Some(envelope),
Peek::Ready(envelope) | Peek::NotReady(_, envelope) => Some(envelope),
}
}
}
Expand Down Expand Up @@ -793,7 +816,6 @@ mod tests {
assert_eq!(buffer.priority_queue.len(), 2);
}

#[ignore = "last_peek disabled for now"]
#[tokio::test]
async fn test_last_peek_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(mock_memory_checker());
Expand All @@ -804,44 +826,42 @@ mod tests {

let project_key_2 = ProjectKey::parse("b56ae32be2584e0bbd7a4cbb95971fed").unwrap();
let event_id_2 = EventId::new();
let mut envelope2 = new_envelope(project_key_2, None, Some(event_id_2));
envelope2.set_start_time(envelope1.meta().start_time());
let envelope2 = new_envelope(project_key_2, None, Some(event_id_2));

buffer.push(envelope1).await.unwrap();
buffer.push(envelope2).await.unwrap();

assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.event_id()
.unwrap(),
event_id_1
);
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.event_id()
.unwrap(),
event_id_2
);
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.event_id()
.unwrap(),
event_id_1
);
// event_id_1 is first element:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));

// Second peek returns same element:
let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));

buffer.mark_seen(&stack_key);

// After mark_seen, event 2 is on top:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_2));

let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_2));

buffer.mark_seen(&stack_key);

// After another mark_seen, cycle back to event 1:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));
}
}
34 changes: 15 additions & 19 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Types for buffering envelopes.

use std::future;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
Expand Down Expand Up @@ -83,9 +83,11 @@ pub struct EnvelopeBufferService {
buffer: PolymorphicEnvelopeBuffer,
project_cache: Addr<ProjectCache>,
has_capacity: Arc<AtomicBool>,
changes: bool,
sleep: Duration,
}

const DEFAULT_SLEEP: Duration = Duration::from_millis(100);

impl EnvelopeBufferService {
/// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config.
///
Expand All @@ -100,7 +102,7 @@ impl EnvelopeBufferService {
buffer: PolymorphicEnvelopeBuffer::from_config(config, memory_checker),
project_cache,
has_capacity: Arc::new(AtomicBool::new(true)),
changes: true,
sleep: Duration::ZERO,
})
}

Expand All @@ -113,23 +115,15 @@ impl EnvelopeBufferService {
}
}

/// Return immediately if changes were flagged, otherwise sleep forever.
///
/// NOTE: This function sleeps indefinitely if no changes were flagged.
/// Only use in combination with [`tokio::select!`].
async fn wait_for_changes(&mut self) {
if !self.changes {
let _: () = future::pending().await; // wait until cancelled
}
}

/// Tries to pop an envelope for a ready project.
///
/// Returns the amount of time we should wait until next pop
async fn try_pop(&mut self) -> Result<(), EnvelopeBufferError> {
relay_log::trace!("EnvelopeBufferService peek");
match self.buffer.peek().await? {
Peek::Empty => {
relay_log::trace!("EnvelopeBufferService empty");
self.changes = false;
self.sleep = Duration::MAX; // wait for reset by `handle_message`.
}
Peek::Ready(_) => {
relay_log::trace!("EnvelopeBufferService pop");
Expand All @@ -139,9 +133,9 @@ impl EnvelopeBufferService {
.await?
.expect("Element disappeared despite exclusive excess");
self.project_cache.send(DequeuedEnvelope(envelope));
self.changes = true;
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(envelope) => {
Peek::NotReady(stack_key, envelope) => {
relay_log::trace!("EnvelopeBufferService request update");
let project_key = envelope.meta().public_key();
self.project_cache.send(UpdateProject(project_key));
Expand All @@ -152,7 +146,9 @@ impl EnvelopeBufferService {
self.project_cache.send(UpdateProject(sampling_key));
}
}
self.changes = false;
// deprioritize the stack to prevent head-of-line blocking
self.buffer.mark_seen(&stack_key);
self.sleep = DEFAULT_SLEEP;
}
}
Ok(())
Expand All @@ -179,7 +175,7 @@ impl EnvelopeBufferService {
self.buffer.mark_ready(&project_key, true);
}
};
self.changes = true;
self.sleep = Duration::ZERO;
}

async fn push(&mut self, envelope: Box<Envelope>) {
Expand Down Expand Up @@ -210,7 +206,7 @@ impl Service for EnvelopeBufferService {
// On the one hand, we might want to prioritize dequeing over enqueing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
() = self.wait_for_changes() => {
() = tokio::time::sleep(self.sleep) => {
if let Err(e) = self.try_pop().await {
relay_log::error!(
error = &e as &dyn std::error::Error,
Expand Down
Loading