Skip to content

Commit

Permalink
feat(buffer): Disable back pressure (#4039)
Browse files Browse the repository at this point in the history
In the experimental new buffer implementation, do not wait for project
cache to push an unspooled message to it.

This risks flooding the service queue, but we have a second back
pressure in place that stops dequeueing envelopes when the memory
threshold has been reached.
  • Loading branch information
jjbayer committed Sep 17, 2024
1 parent b498fb5 commit 84ed805
Showing 1 changed file with 2 additions and 50 deletions.
52 changes: 2 additions & 50 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use relay_system::{Controller, Request, Shutdown};
use relay_system::{Controller, Shutdown};
use tokio::sync::watch;
use tokio::time::timeout;

Expand Down Expand Up @@ -97,7 +97,6 @@ pub struct EnvelopeBufferService {
project_cache: Addr<ProjectCache>,
has_capacity: Arc<AtomicBool>,
sleep: Duration,
project_cache_ready: Option<Request<()>>,
}

/// The maximum amount of time between evaluations of dequeue conditions.
Expand Down Expand Up @@ -125,7 +124,6 @@ impl EnvelopeBufferService {
project_cache,
has_capacity: Arc::new(AtomicBool::new(true)),
sleep: Duration::ZERO,
project_cache_ready: None,
})
}

Expand Down Expand Up @@ -164,19 +162,6 @@ impl EnvelopeBufferService {
status = "slept"
);

if let Some(project_cache_ready) = self.project_cache_ready.as_mut() {
relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "waiting_for_project_cache"
);
project_cache_ready.await?;
relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "waited_for_project_cache"
);
self.project_cache_ready = None;
}

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checked"
Expand Down Expand Up @@ -230,9 +215,8 @@ impl EnvelopeBufferService {
.pop()
.await?
.expect("Element disappeared despite exclusive excess");
self.project_cache.send(DequeuedEnvelope(envelope));

self.project_cache_ready
.replace(self.project_cache.send(DequeuedEnvelope(envelope)));
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, envelope) => {
Expand Down Expand Up @@ -531,36 +515,4 @@ mod tests {
// Nothing was dequeued, memory not ready:
assert_eq!(project_cache_rx.len(), 0);
}

#[tokio::test]
async fn output_is_throttled() {
tokio::time::pause();
let (service, global_tx, mut project_cache_rx) = buffer_service();
global_tx.send_replace(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));

let addr = service.start();

// Send five messages:
let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
for _ in 0..5 {
addr.send(EnvelopeBuffer::Push(envelope.clone()));
}
addr.send(EnvelopeBuffer::Ready(project_key));

tokio::time::sleep(Duration::from_millis(100)).await;

let mut messages = vec![];
project_cache_rx.recv_many(&mut messages, 100).await;

assert_eq!(
messages
.iter()
.filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..)))
.count(),
1
);
}
}

0 comments on commit 84ed805

Please sign in to comment.