Skip to content

Commit

Permalink
feat(buffer): Disable back pressure
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Sep 17, 2024
1 parent b498fb5 commit 05864f6
Showing 1 changed file with 33 additions and 49 deletions.
82 changes: 33 additions & 49 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use relay_system::{Controller, Request, Shutdown};
use relay_system::{Controller, Shutdown};
use tokio::sync::watch;
use tokio::time::timeout;

Expand Down Expand Up @@ -97,7 +97,6 @@ pub struct EnvelopeBufferService {
project_cache: Addr<ProjectCache>,
has_capacity: Arc<AtomicBool>,
sleep: Duration,
project_cache_ready: Option<Request<()>>,
}

/// The maximum amount of time between evaluations of dequeue conditions.
Expand Down Expand Up @@ -125,7 +124,6 @@ impl EnvelopeBufferService {
project_cache,
has_capacity: Arc::new(AtomicBool::new(true)),
sleep: Duration::ZERO,
project_cache_ready: None,
})
}

Expand Down Expand Up @@ -164,19 +162,6 @@ impl EnvelopeBufferService {
status = "slept"
);

if let Some(project_cache_ready) = self.project_cache_ready.as_mut() {
relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "waiting_for_project_cache"
);
project_cache_ready.await?;
relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "waited_for_project_cache"
);
self.project_cache_ready = None;
}

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checked"
Expand Down Expand Up @@ -230,9 +215,8 @@ impl EnvelopeBufferService {
.pop()
.await?
.expect("Element disappeared despite exclusive excess");
self.project_cache.send(DequeuedEnvelope(envelope));

self.project_cache_ready
.replace(self.project_cache.send(DequeuedEnvelope(envelope)));
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, envelope) => {
Expand Down Expand Up @@ -532,35 +516,35 @@ mod tests {
assert_eq!(project_cache_rx.len(), 0);
}

#[tokio::test]
async fn output_is_throttled() {
tokio::time::pause();
let (service, global_tx, mut project_cache_rx) = buffer_service();
global_tx.send_replace(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));

let addr = service.start();

// Send five messages:
let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
for _ in 0..5 {
addr.send(EnvelopeBuffer::Push(envelope.clone()));
}
addr.send(EnvelopeBuffer::Ready(project_key));

tokio::time::sleep(Duration::from_millis(100)).await;

let mut messages = vec![];
project_cache_rx.recv_many(&mut messages, 100).await;

assert_eq!(
messages
.iter()
.filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..)))
.count(),
1
);
}
// #[tokio::test]
// async fn output_is_throttled() {
// tokio::time::pause();
// let (service, global_tx, mut project_cache_rx) = buffer_service();
// global_tx.send_replace(global_config::Status::Ready(Arc::new(
// GlobalConfig::default(),
// )));

// let addr = service.start();

// // Send five messages:
// let envelope = new_envelope(false, "foo");
// let project_key = envelope.meta().public_key();
// for _ in 0..5 {
// addr.send(EnvelopeBuffer::Push(envelope.clone()));
// }
// addr.send(EnvelopeBuffer::Ready(project_key));

// tokio::time::sleep(Duration::from_millis(100)).await;

// let mut messages = vec![];
// project_cache_rx.recv_many(&mut messages, 100).await;

// assert_eq!(
// messages
// .iter()
// .filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..)))
// .count(),
// 1
// );
// }
}

0 comments on commit 05864f6

Please sign in to comment.