Skip to content

Commit

Permalink
sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Aug 30, 2024
1 parent 3c5e16c commit 86638ac
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 25 deletions.
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
- Add `EnvelopeStore` trait and implement `DiskUsage` for tracking disk usage. ([#3925](https://github.com/getsentry/relay/pull/3925))
- Increase replay recording limit to two hours. ([#3961](https://github.com/getsentry/relay/pull/3961))
- Make EnvelopeBuffer a Service. ([#3965](https://github.com/getsentry/relay/pull/3965))

**Internal**:

- No longer send COGS data to dedicated Kafka topic. ([#3953](https://github.com/getsentry/relay/pull/3953))

## 24.8.0
Expand Down
38 changes: 16 additions & 22 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Types for buffering envelopes.

use std::future;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
Expand Down Expand Up @@ -83,9 +83,11 @@ pub struct EnvelopeBufferService {
buffer: PolymorphicEnvelopeBuffer,
project_cache: Addr<ProjectCache>,
has_capacity: Arc<AtomicBool>,
changes: bool,
// sleep: Duration,
}

const DEFAULT_SLEEP: Duration = Duration::from_millis(100);

impl EnvelopeBufferService {
/// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config.
///
Expand All @@ -100,7 +102,6 @@ impl EnvelopeBufferService {
buffer: PolymorphicEnvelopeBuffer::from_config(config, memory_checker),
project_cache,
has_capacity: Arc::new(AtomicBool::new(true)),
changes: true,
})
}

Expand All @@ -113,23 +114,16 @@ impl EnvelopeBufferService {
}
}

/// Return immediately if changes were flagged, otherwise sleep forever.
///
/// NOTE: This function sleeps indefinitely if no changes were flagged.
/// Only use in combination with [`tokio::select!`].
async fn wait_for_changes(&mut self) {
if !self.changes {
let _: () = future::pending().await; // wait until cancelled
}
}

/// Tries to pop an envelope for a ready project.
async fn try_pop(&mut self) -> Result<(), EnvelopeBufferError> {
///
/// Returns the amount of time we should wait until next pop
async fn try_pop(&mut self) -> Result<Duration, EnvelopeBufferError> {
relay_log::trace!("EnvelopeBufferService peek");
let next_sleep;
match self.buffer.peek().await? {
Peek::Empty => {
relay_log::trace!("EnvelopeBufferService empty");
self.changes = false;
next_sleep = DEFAULT_SLEEP
}
Peek::Ready(_) => {
relay_log::trace!("EnvelopeBufferService pop");
Expand All @@ -139,7 +133,7 @@ impl EnvelopeBufferService {
.await?
.expect("Element disappeared despite exclusive excess");
self.project_cache.send(DequeuedEnvelope(envelope));
self.changes = true;
next_sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, envelope) => {
relay_log::trace!("EnvelopeBufferService request update");
Expand All @@ -154,10 +148,10 @@ impl EnvelopeBufferService {
}
// deprioritize the stack to prevent head-of-line blocking
self.buffer.mark_seen(&stack_key);
self.changes = false;
next_sleep = DEFAULT_SLEEP;
}
}
Ok(())
Ok(next_sleep)
}

async fn handle_message(&mut self, message: EnvelopeBuffer) {
Expand All @@ -181,7 +175,6 @@ impl EnvelopeBufferService {
self.buffer.mark_ready(&project_key, true);
}
};
self.changes = true;
}

async fn push(&mut self, envelope: Box<Envelope>) {
Expand All @@ -205,20 +198,21 @@ impl Service for EnvelopeBufferService {
fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) {
tokio::spawn(async move {
relay_log::info!("EnvelopeBufferService start");
let mut sleep = Duration::ZERO;
loop {
relay_log::trace!("EnvelopeBufferService loop");
tokio::select! {
// NOTE: we do not select a bias here.
// On the one hand, we might want to prioritize dequeing over enqueing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
() = self.wait_for_changes() => {
if let Err(e) = self.try_pop().await {
() = tokio::time::sleep(sleep) => {
sleep = self.try_pop().await.map_err(|e| {
relay_log::error!(
error = &e as &dyn std::error::Error,
"failed to pop envelope"
);
}
}).unwrap_or(DEFAULT_SLEEP);
}
Some(message) = rx.recv() => {
self.handle_message(message).await;
Expand Down

0 comments on commit 86638ac

Please sign in to comment.