diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 289014f858..cf46870ec2 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -1,5 +1,6 @@ use std::convert::Infallible; use std::fmt; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; @@ -241,6 +242,9 @@ impl ServiceState { ) .spawn_handler(processor_rx); + // We initialize a shared boolean that is used to manage backpressure between the + // EnvelopeBufferService and the ProjectCacheService. + let project_cache_ready = Arc::new(AtomicBool::new(true)); let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), @@ -250,6 +254,7 @@ impl ServiceState { outcome_aggregator: outcome_aggregator.clone(), test_store: test_store.clone(), }, + project_cache_ready.clone(), ) .map(|b| b.start_observable()); @@ -272,6 +277,7 @@ impl ServiceState { redis_pools .as_ref() .map(|pools| pools.project_configs.clone()), + project_cache_ready, ) .spawn_handler(project_cache_rx); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index a21d7bd8c7..a88f5b650d 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -110,6 +110,7 @@ pub struct EnvelopeBufferService { services: Services, has_capacity: Arc, sleep: Duration, + project_cache_ready: Arc, } /// The maximum amount of time between evaluations of dequeue conditions. @@ -128,6 +129,7 @@ impl EnvelopeBufferService { memory_checker: MemoryChecker, global_config_rx: watch::Receiver, services: Services, + project_cache_ready: Arc, ) -> Option { config.spool_v2().then(|| Self { config, @@ -136,6 +138,7 @@ impl EnvelopeBufferService { services, has_capacity: Arc::new(AtomicBool::new(true)), sleep: Duration::ZERO, + project_cache_ready, }) } @@ -169,6 +172,12 @@ impl EnvelopeBufferService { tokio::time::sleep(self.sleep).await; } + // In case the project cache is not ready, we defer popping to first try and handle incoming + // messages and only come back to this in case within the timeout no data was received. + while !self.project_cache_ready.load(Ordering::Relaxed) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "slept" @@ -234,7 +243,11 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); + // We assume that the project cache is now busy to process this envelope, so we flip + // the boolean flag, which will prioritize writes. + self.project_cache_ready.store(false, Ordering::SeqCst); self.services.project_cache.send(DequeuedEnvelope(envelope)); + self.sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, envelope) => { @@ -439,6 +452,7 @@ mod tests { watch::Sender, mpsc::UnboundedReceiver, mpsc::UnboundedReceiver, + Arc, ) { let config = Arc::new( Config::from_json_value(serde_json::json!({ @@ -454,6 +468,7 @@ mod tests { let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom(); + let project_cache_ready = Arc::new(AtomicBool::new(true)); ( EnvelopeBufferService::new( config, @@ -464,18 +479,20 @@ mod tests { outcome_aggregator, test_store: Addr::dummy(), }, + project_cache_ready.clone(), ) .unwrap(), global_tx, project_cache_rx, outcome_aggregator_rx, + project_cache_ready, ) } #[tokio::test] async fn capacity_is_updated() { tokio::time::pause(); - let (service, _global_rx, _project_cache_tx, _) = buffer_service(); + let (service, _global_rx, _project_cache_tx, _, _) = buffer_service(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); @@ -497,7 +514,7 @@ mod tests { #[tokio::test] async fn pop_requires_global_config() { tokio::time::pause(); - let (service, global_tx, project_cache_rx, _) = buffer_service(); + let (service, global_tx, project_cache_rx, _, _) = buffer_service(); let addr = service.start(); @@ -546,6 +563,7 @@ mod tests { ))); let (project_cache, project_cache_rx) = Addr::custom(); + let project_cache_ready = Arc::new(AtomicBool::new(true)); let service = EnvelopeBufferService::new( config, memory_checker, @@ -555,6 +573,7 @@ mod tests { outcome_aggregator: Addr::dummy(), test_store: Addr::dummy(), }, + project_cache_ready, ) .unwrap(); let addr = service.start(); @@ -590,6 +609,7 @@ mod tests { let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); + let project_cache_ready = Arc::new(AtomicBool::new(true)); let service = EnvelopeBufferService::new( config, memory_checker, @@ -599,6 +619,7 @@ mod tests { outcome_aggregator, test_store: Addr::dummy(), }, + project_cache_ready, ) .unwrap(); @@ -623,4 +644,36 @@ mod tests { assert_eq!(outcome.category, DataCategory::TransactionIndexed); assert_eq!(outcome.quantity, 1); } + + #[tokio::test] + async fn output_is_throttled() { + tokio::time::pause(); + let (service, global_tx, mut project_cache_rx, _, _) = buffer_service(); + global_tx.send_replace(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); + + let addr = service.start(); + + // Send five messages: + let envelope = new_envelope(false, "foo"); + let project_key = envelope.meta().public_key(); + for _ in 0..5 { + addr.send(EnvelopeBuffer::Push(envelope.clone())); + } + addr.send(EnvelopeBuffer::Ready(project_key)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut messages = vec![]; + project_cache_rx.recv_many(&mut messages, 100).await; + + assert_eq!( + messages + .iter() + .filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..))) + .count(), + 1 + ); + } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 3c6633207e..2ce2920d06 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -1,5 +1,6 @@ use std::collections::{BTreeMap, BTreeSet}; use std::error::Error; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -292,7 +293,7 @@ pub enum ProjectCache { UpdateSpoolIndex(UpdateSpoolIndex), SpoolHealth(Sender), RefreshIndexCache(RefreshIndexCache), - HandleDequeuedEnvelope(Box, Sender<()>), + HandleDequeuedEnvelope(Box), UpdateProject(ProjectKey), } @@ -311,7 +312,7 @@ impl ProjectCache { Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", Self::SpoolHealth(_) => "SpoolHealth", Self::RefreshIndexCache(_) => "RefreshIndexCache", - Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope", + Self::HandleDequeuedEnvelope(_) => "HandleDequeuedEnvelope", Self::UpdateProject(_) => "UpdateProject", } } @@ -419,11 +420,11 @@ impl FromMessage for ProjectCache { } impl FromMessage for ProjectCache { - type Response = relay_system::AsyncResponse<()>; + type Response = relay_system::NoResponse; - fn from_message(message: DequeuedEnvelope, sender: Sender<()>) -> Self { + fn from_message(message: DequeuedEnvelope, _: ()) -> Self { let DequeuedEnvelope(envelope) = message; - Self::HandleDequeuedEnvelope(envelope, sender) + Self::HandleDequeuedEnvelope(envelope) } } @@ -610,6 +611,8 @@ struct ProjectCacheBroker { spool_v1: Option, /// Status of the global configuration, used to determine readiness for processing. global_config: GlobalConfigStatus, + /// Atomic boolean signaling whether the project cache is ready to accept a new envelope. + project_cache_ready: Arc, } #[derive(Debug)] @@ -1305,7 +1308,7 @@ impl ProjectCacheBroker { ProjectCache::RefreshIndexCache(message) => { self.handle_refresh_index_cache(message) } - ProjectCache::HandleDequeuedEnvelope(message, sender) => { + ProjectCache::HandleDequeuedEnvelope(message) => { let envelope_buffer = self .services .envelope_buffer @@ -1318,8 +1321,9 @@ impl ProjectCacheBroker { "Failed to handle popped envelope" ); } - // Return response to signal readiness for next envelope: - sender.send(()) + + // We mark the project cache as ready to accept new traffic. + self.project_cache_ready.store(true, Ordering::SeqCst); } ProjectCache::UpdateProject(project) => self.handle_update_project(project), } @@ -1336,6 +1340,7 @@ pub struct ProjectCacheService { services: Services, global_config_rx: watch::Receiver, redis: Option, + project_cache_ready: Arc, } impl ProjectCacheService { @@ -1346,6 +1351,7 @@ impl ProjectCacheService { services: Services, global_config_rx: watch::Receiver, redis: Option, + project_cache_ready: Arc, ) -> Self { Self { config, @@ -1353,6 +1359,7 @@ impl ProjectCacheService { services, global_config_rx, redis, + project_cache_ready, } } } @@ -1367,6 +1374,7 @@ impl Service for ProjectCacheService { services, mut global_config_rx, redis, + project_cache_ready, } = self; let project_cache = services.project_cache.clone(); let outcome_aggregator = services.outcome_aggregator.clone(); @@ -1448,6 +1456,7 @@ impl Service for ProjectCacheService { spool_v1_unspool_handle: SleepHandle::idle(), spool_v1, global_config, + project_cache_ready, }; loop { @@ -1642,6 +1651,7 @@ mod tests { buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)), }), global_config: GlobalConfigStatus::Pending, + project_cache_ready: Arc::new(AtomicBool::new(true)), }, buffer, )