From 05864f60c86a6bd43481555f1037d56c72df6385 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 17 Sep 2024 09:39:16 +0200 Subject: [PATCH] feat(buffer): Disable back pressure --- relay-server/src/services/buffer/mod.rs | 82 ++++++++++--------------- 1 file changed, 33 insertions(+), 49 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 92c31cd5eb..311145acf9 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -10,7 +10,7 @@ use relay_base_schema::project::ProjectKey; use relay_config::Config; use relay_system::SendError; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; -use relay_system::{Controller, Request, Shutdown}; +use relay_system::{Controller, Shutdown}; use tokio::sync::watch; use tokio::time::timeout; @@ -97,7 +97,6 @@ pub struct EnvelopeBufferService { project_cache: Addr, has_capacity: Arc, sleep: Duration, - project_cache_ready: Option>, } /// The maximum amount of time between evaluations of dequeue conditions. @@ -125,7 +124,6 @@ impl EnvelopeBufferService { project_cache, has_capacity: Arc::new(AtomicBool::new(true)), sleep: Duration::ZERO, - project_cache_ready: None, }) } @@ -164,19 +162,6 @@ impl EnvelopeBufferService { status = "slept" ); - if let Some(project_cache_ready) = self.project_cache_ready.as_mut() { - relay_statsd::metric!( - counter(RelayCounters::BufferReadyToPop) += 1, - status = "waiting_for_project_cache" - ); - project_cache_ready.await?; - relay_statsd::metric!( - counter(RelayCounters::BufferReadyToPop) += 1, - status = "waited_for_project_cache" - ); - self.project_cache_ready = None; - } - relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checked" @@ -230,9 +215,8 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); + self.project_cache.send(DequeuedEnvelope(envelope)); - self.project_cache_ready - .replace(self.project_cache.send(DequeuedEnvelope(envelope))); self.sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, envelope) => { @@ -532,35 +516,35 @@ mod tests { assert_eq!(project_cache_rx.len(), 0); } - #[tokio::test] - async fn output_is_throttled() { - tokio::time::pause(); - let (service, global_tx, mut project_cache_rx) = buffer_service(); - global_tx.send_replace(global_config::Status::Ready(Arc::new( - GlobalConfig::default(), - ))); - - let addr = service.start(); - - // Send five messages: - let envelope = new_envelope(false, "foo"); - let project_key = envelope.meta().public_key(); - for _ in 0..5 { - addr.send(EnvelopeBuffer::Push(envelope.clone())); - } - addr.send(EnvelopeBuffer::Ready(project_key)); - - tokio::time::sleep(Duration::from_millis(100)).await; - - let mut messages = vec![]; - project_cache_rx.recv_many(&mut messages, 100).await; - - assert_eq!( - messages - .iter() - .filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..))) - .count(), - 1 - ); - } + // #[tokio::test] + // async fn output_is_throttled() { + // tokio::time::pause(); + // let (service, global_tx, mut project_cache_rx) = buffer_service(); + // global_tx.send_replace(global_config::Status::Ready(Arc::new( + // GlobalConfig::default(), + // ))); + + // let addr = service.start(); + + // // Send five messages: + // let envelope = new_envelope(false, "foo"); + // let project_key = envelope.meta().public_key(); + // for _ in 0..5 { + // addr.send(EnvelopeBuffer::Push(envelope.clone())); + // } + // addr.send(EnvelopeBuffer::Ready(project_key)); + + // tokio::time::sleep(Duration::from_millis(100)).await; + + // let mut messages = vec![]; + // project_cache_rx.recv_many(&mut messages, 100).await; + + // assert_eq!( + // messages + // .iter() + // .filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..))) + // .count(), + // 1 + // ); + // } }