diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 92c31cd5eb..36aa2d87a0 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -10,7 +10,7 @@ use relay_base_schema::project::ProjectKey; use relay_config::Config; use relay_system::SendError; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; -use relay_system::{Controller, Request, Shutdown}; +use relay_system::{Controller, Shutdown}; use tokio::sync::watch; use tokio::time::timeout; @@ -97,7 +97,6 @@ pub struct EnvelopeBufferService { project_cache: Addr, has_capacity: Arc, sleep: Duration, - project_cache_ready: Option>, } /// The maximum amount of time between evaluations of dequeue conditions. @@ -125,7 +124,6 @@ impl EnvelopeBufferService { project_cache, has_capacity: Arc::new(AtomicBool::new(true)), sleep: Duration::ZERO, - project_cache_ready: None, }) } @@ -164,19 +162,6 @@ impl EnvelopeBufferService { status = "slept" ); - if let Some(project_cache_ready) = self.project_cache_ready.as_mut() { - relay_statsd::metric!( - counter(RelayCounters::BufferReadyToPop) += 1, - status = "waiting_for_project_cache" - ); - project_cache_ready.await?; - relay_statsd::metric!( - counter(RelayCounters::BufferReadyToPop) += 1, - status = "waited_for_project_cache" - ); - self.project_cache_ready = None; - } - relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checked" @@ -230,9 +215,8 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); + self.project_cache.send(DequeuedEnvelope(envelope)); - self.project_cache_ready - .replace(self.project_cache.send(DequeuedEnvelope(envelope))); self.sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, envelope) => { @@ -531,36 +515,4 @@ mod tests { // Nothing was dequeued, memory not ready: assert_eq!(project_cache_rx.len(), 0); } - - #[tokio::test] - async fn output_is_throttled() { - tokio::time::pause(); - let (service, global_tx, mut project_cache_rx) = buffer_service(); - global_tx.send_replace(global_config::Status::Ready(Arc::new( - GlobalConfig::default(), - ))); - - let addr = service.start(); - - // Send five messages: - let envelope = new_envelope(false, "foo"); - let project_key = envelope.meta().public_key(); - for _ in 0..5 { - addr.send(EnvelopeBuffer::Push(envelope.clone())); - } - addr.send(EnvelopeBuffer::Ready(project_key)); - - tokio::time::sleep(Duration::from_millis(100)).await; - - let mut messages = vec![]; - project_cache_rx.recv_many(&mut messages, 100).await; - - assert_eq!( - messages - .iter() - .filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..))) - .count(), - 1 - ); - } }