From e77c328080fe9354ca37850c9f5eb45e401c2486 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 17 Sep 2024 11:44:05 +0200 Subject: [PATCH 1/8] feat(spooler): Implement backpressure mechanism based on atomics --- relay-server/src/service.rs | 6 ++ relay-server/src/services/buffer/mod.rs | 73 +++++++++++++++++++--- relay-server/src/services/project_cache.rs | 26 +++++--- 3 files changed, 90 insertions(+), 15 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index f8fbdfe48c..f6b2d4c4a8 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -1,5 +1,6 @@ use std::convert::Infallible; use std::fmt; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; @@ -241,11 +242,15 @@ impl ServiceState { ) .spawn_handler(processor_rx); + // We initialize a shared boolean that is used to manage backpressure between the + // EnvelopeBufferService and the ProjectCacheService. + let project_cache_ready = Arc::new(AtomicBool::new(true)); let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), global_config_rx.clone(), project_cache.clone(), + project_cache_ready.clone(), ) .map(|b| b.start_observable()); @@ -268,6 +273,7 @@ impl ServiceState { redis_pools .as_ref() .map(|pools| pools.project_configs.clone()), + project_cache_ready, ) .spawn_handler(project_cache_rx); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 36aa2d87a0..d8534b0ad6 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -1,6 +1,7 @@ //! Types for buffering envelopes. use std::error::Error; +use std::future; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -97,6 +98,7 @@ pub struct EnvelopeBufferService { project_cache: Addr, has_capacity: Arc, sleep: Duration, + project_cache_ready: Arc, } /// The maximum amount of time between evaluations of dequeue conditions. @@ -115,6 +117,7 @@ impl EnvelopeBufferService { memory_checker: MemoryChecker, global_config_rx: watch::Receiver, project_cache: Addr, + project_cache_ready: Arc, ) -> Option { config.spool_v2().then(|| Self { config, @@ -124,6 +127,7 @@ impl EnvelopeBufferService { project_cache, has_capacity: Arc::new(AtomicBool::new(true)), sleep: Duration::ZERO, + project_cache_ready, }) } @@ -157,6 +161,11 @@ impl EnvelopeBufferService { tokio::time::sleep(self.sleep).await; } + // In case the project cache is not ready, we don't want to attempt popping. + if !self.project_cache_ready.load(Ordering::Relaxed) { + return future::pending().await; + } + relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "slept" @@ -215,6 +224,9 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); + // We assume that the project cache is now busy to process this envelope, so we flip + // the boolean flag, which will prioritize writes. + self.project_cache_ready.store(false, Ordering::SeqCst); self.project_cache.send(DequeuedEnvelope(envelope)); self.sleep = Duration::ZERO; // try next pop immediately @@ -403,6 +415,7 @@ mod tests { EnvelopeBufferService, watch::Sender, mpsc::UnboundedReceiver, + Arc, ) { let config = Arc::new( Config::from_json_value(serde_json::json!({ @@ -417,18 +430,26 @@ mod tests { let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); let (project_cache_addr, project_cache_rx) = Addr::custom(); + let project_cache_ready = Arc::new(AtomicBool::new(true)); ( - EnvelopeBufferService::new(config, memory_checker, global_rx, project_cache_addr) - .unwrap(), + EnvelopeBufferService::new( + config, + memory_checker, + global_rx, + project_cache_addr, + project_cache_ready.clone(), + ) + .unwrap(), global_tx, project_cache_rx, + project_cache_ready, ) } #[tokio::test] async fn capacity_is_updated() { tokio::time::pause(); - let (service, _global_rx, _project_cache_tx) = buffer_service(); + let (service, _global_rx, _project_cache_tx, _project_cache_ready) = buffer_service(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); @@ -450,7 +471,7 @@ mod tests { #[tokio::test] async fn pop_requires_global_config() { tokio::time::pause(); - let (service, global_tx, project_cache_rx) = buffer_service(); + let (service, global_tx, project_cache_rx, _project_cache_ready) = buffer_service(); let addr = service.start(); @@ -499,9 +520,15 @@ mod tests { ))); let (project_cache_addr, project_cache_rx) = Addr::custom(); - let service = - EnvelopeBufferService::new(config, memory_checker, global_rx, project_cache_addr) - .unwrap(); + let project_cache_ready = Arc::new(AtomicBool::new(true)); + let service = EnvelopeBufferService::new( + config, + memory_checker, + global_rx, + project_cache_addr, + project_cache_ready, + ) + .unwrap(); let addr = service.start(); // Send five messages: @@ -515,4 +542,36 @@ mod tests { // Nothing was dequeued, memory not ready: assert_eq!(project_cache_rx.len(), 0); } + + #[tokio::test] + async fn output_is_throttled() { + tokio::time::pause(); + let (service, global_tx, mut project_cache_rx, _project_cache_ready) = buffer_service(); + global_tx.send_replace(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); + + let addr = service.start(); + + // Send five messages: + let envelope = new_envelope(false, "foo"); + let project_key = envelope.meta().public_key(); + for _ in 0..5 { + addr.send(EnvelopeBuffer::Push(envelope.clone())); + } + addr.send(EnvelopeBuffer::Ready(project_key)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut messages = vec![]; + project_cache_rx.recv_many(&mut messages, 100).await; + + assert_eq!( + messages + .iter() + .filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..))) + .count(), + 1 + ); + } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index ef2f3f813f..fd9fd21465 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, BTreeSet}; use std::error::Error; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -292,7 +293,7 @@ pub enum ProjectCache { UpdateSpoolIndex(UpdateSpoolIndex), SpoolHealth(Sender), RefreshIndexCache(RefreshIndexCache), - HandleDequeuedEnvelope(Box, Sender<()>), + HandleDequeuedEnvelope(Box), UpdateProject(ProjectKey), } @@ -311,7 +312,7 @@ impl ProjectCache { Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", Self::SpoolHealth(_) => "SpoolHealth", Self::RefreshIndexCache(_) => "RefreshIndexCache", - Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope", + Self::HandleDequeuedEnvelope(_) => "HandleDequeuedEnvelope", Self::UpdateProject(_) => "UpdateProject", } } @@ -419,11 +420,11 @@ impl FromMessage for ProjectCache { } impl FromMessage for ProjectCache { - type Response = relay_system::AsyncResponse<()>; + type Response = relay_system::NoResponse; - fn from_message(message: DequeuedEnvelope, sender: Sender<()>) -> Self { + fn from_message(message: DequeuedEnvelope, _: ()) -> Self { let DequeuedEnvelope(envelope) = message; - Self::HandleDequeuedEnvelope(envelope, sender) + Self::HandleDequeuedEnvelope(envelope) } } @@ -610,6 +611,8 @@ struct ProjectCacheBroker { spool_v1: Option, /// Status of the global configuration, used to determine readiness for processing. global_config: GlobalConfigStatus, + /// Atomic boolean signaling whether the project cache is ready to accept a new envelope. + project_cache_ready: Arc, } #[derive(Debug)] @@ -1315,7 +1318,7 @@ impl ProjectCacheBroker { ProjectCache::RefreshIndexCache(message) => { self.handle_refresh_index_cache(message) } - ProjectCache::HandleDequeuedEnvelope(message, sender) => { + ProjectCache::HandleDequeuedEnvelope(message) => { let envelope_buffer = self .services .envelope_buffer @@ -1328,8 +1331,9 @@ impl ProjectCacheBroker { "Failed to handle popped envelope" ); } - // Return response to signal readiness for next envelope: - sender.send(()) + + // We mark the project cache as ready to accept new traffic. + self.project_cache_ready.store(true, Ordering::SeqCst); } ProjectCache::UpdateProject(project) => self.handle_update_project(project), } @@ -1346,6 +1350,7 @@ pub struct ProjectCacheService { services: Services, global_config_rx: watch::Receiver, redis: Option, + project_cache_ready: Arc, } impl ProjectCacheService { @@ -1356,6 +1361,7 @@ impl ProjectCacheService { services: Services, global_config_rx: watch::Receiver, redis: Option, + project_cache_ready: Arc, ) -> Self { Self { config, @@ -1363,6 +1369,7 @@ impl ProjectCacheService { services, global_config_rx, redis, + project_cache_ready, } } } @@ -1377,6 +1384,7 @@ impl Service for ProjectCacheService { services, mut global_config_rx, redis, + project_cache_ready, } = self; let project_cache = services.project_cache.clone(); let outcome_aggregator = services.outcome_aggregator.clone(); @@ -1458,6 +1466,7 @@ impl Service for ProjectCacheService { spool_v1_unspool_handle: SleepHandle::idle(), spool_v1, global_config, + project_cache_ready, }; loop { @@ -1652,6 +1661,7 @@ mod tests { buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)), }), global_config: GlobalConfigStatus::Pending, + project_cache_ready: Arc::new(AtomicBool::new(true)), }, buffer, ) From dd1ab83d5eb944240fa33d7daae0bf65e38031b2 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 17 Sep 2024 12:12:42 +0200 Subject: [PATCH 2/8] Fix --- relay-server/src/services/buffer/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index bf734938b6..e227295f71 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -173,9 +173,10 @@ impl EnvelopeBufferService { tokio::time::sleep(self.sleep).await; } - // In case the project cache is not ready, we don't want to attempt popping. + // In case the project cache is not ready, we defer popping to first try and handle incoming + // messages and only come back to this in case within the timeout no data was received. if !self.project_cache_ready.load(Ordering::Relaxed) { - return future::pending().await; + tokio::time::sleep(DEFAULT_SLEEP).await; } relay_statsd::metric!( From 58c6af478af700bb14aae77d1583064b852a206b Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 17 Sep 2024 12:17:03 +0200 Subject: [PATCH 3/8] Fix --- relay-server/src/services/buffer/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index e227295f71..5afc884d12 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -1,7 +1,6 @@ //! Types for buffering envelopes. use std::error::Error; -use std::future; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -175,8 +174,8 @@ impl EnvelopeBufferService { // In case the project cache is not ready, we defer popping to first try and handle incoming // messages and only come back to this in case within the timeout no data was received. - if !self.project_cache_ready.load(Ordering::Relaxed) { - tokio::time::sleep(DEFAULT_SLEEP).await; + while !self.project_cache_ready.load(Ordering::Relaxed) { + tokio::time::sleep(Duration::ZERO).await; } relay_statsd::metric!( From ed1749f7ca2cecb4e3f8a64dde5280fa7c313190 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 17 Sep 2024 12:25:31 +0200 Subject: [PATCH 4/8] Use yield now --- relay-server/src/services/buffer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 5afc884d12..1434a1d28c 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -175,7 +175,7 @@ impl EnvelopeBufferService { // In case the project cache is not ready, we defer popping to first try and handle incoming // messages and only come back to this in case within the timeout no data was received. while !self.project_cache_ready.load(Ordering::Relaxed) { - tokio::time::sleep(Duration::ZERO).await; + tokio::task::yield_now().await; } relay_statsd::metric!( From eef179a04e02390de036d70e7e2a62be35236aaa Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 17 Sep 2024 12:47:23 +0200 Subject: [PATCH 5/8] Use yield now --- relay-server/src/services/buffer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 1434a1d28c..5756193e1f 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -175,7 +175,7 @@ impl EnvelopeBufferService { // In case the project cache is not ready, we defer popping to first try and handle incoming // messages and only come back to this in case within the timeout no data was received. while !self.project_cache_ready.load(Ordering::Relaxed) { - tokio::task::yield_now().await; + tokio::time::sleep(DEFAULT_SLEEP).await; } relay_statsd::metric!( From 3dd880e55d439ce5c1faa9e51d98bd88c0dbd605 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 17 Sep 2024 12:50:45 +0200 Subject: [PATCH 6/8] Fix --- relay-server/src/services/buffer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 5756193e1f..5afc884d12 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -175,7 +175,7 @@ impl EnvelopeBufferService { // In case the project cache is not ready, we defer popping to first try and handle incoming // messages and only come back to this in case within the timeout no data was received. while !self.project_cache_ready.load(Ordering::Relaxed) { - tokio::time::sleep(DEFAULT_SLEEP).await; + tokio::time::sleep(Duration::ZERO).await; } relay_statsd::metric!( From 7b7d8d02b1ad78bd3a408e47f268e4204363d289 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 17 Sep 2024 13:32:01 +0200 Subject: [PATCH 7/8] Fix --- relay-server/src/services/buffer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 5afc884d12..f330f9a953 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -175,7 +175,7 @@ impl EnvelopeBufferService { // In case the project cache is not ready, we defer popping to first try and handle incoming // messages and only come back to this in case within the timeout no data was received. while !self.project_cache_ready.load(Ordering::Relaxed) { - tokio::time::sleep(Duration::ZERO).await; + tokio::time::sleep(Duration::from_nanos(1)).await; } relay_statsd::metric!( From 33415ddf0401ad8026490bab078036a1e9ecb60c Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Tue, 17 Sep 2024 13:34:31 +0200 Subject: [PATCH 8/8] Fix --- relay-server/src/services/buffer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index f330f9a953..a88f5b650d 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -175,7 +175,7 @@ impl EnvelopeBufferService { // In case the project cache is not ready, we defer popping to first try and handle incoming // messages and only come back to this in case within the timeout no data was received. while !self.project_cache_ready.load(Ordering::Relaxed) { - tokio::time::sleep(Duration::from_nanos(1)).await; + tokio::time::sleep(Duration::from_millis(10)).await; } relay_statsd::metric!(