From 1a21ad56dd1bf9bedcc0a16d044357aa97844b0f Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 20 Sep 2024 10:43:36 +0200 Subject: [PATCH] Add test --- relay-server/src/services/buffer/mod.rs | 32 +++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 4d4c12f9d0..6a3977dde0 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -705,4 +705,36 @@ mod tests { Some(ProjectCache::UpdateProject(key)) if key == project_key )) } + + #[tokio::test] + async fn output_is_throttled() { + tokio::time::pause(); + let (service, global_tx, mut envelopes_rx, _project_cache_rx, _) = buffer_service(); + global_tx.send_replace(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); + + let addr = service.start(); + + // Send 10 messages, with a bounded queue size of 5. + let envelope = new_envelope(false, "foo"); + let project_key = envelope.meta().public_key(); + for _ in 0..10 { + addr.send(EnvelopeBuffer::Push(envelope.clone())); + } + addr.send(EnvelopeBuffer::Ready(project_key)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut messages = vec![]; + envelopes_rx.recv_many(&mut messages, 100).await; + + assert_eq!( + messages + .iter() + .filter(|message| matches!(message, DequeuedEnvelope(..))) + .count(), + 5 + ); + } }