From 911da87b2f1f8899a2c99f058195cf6ea05fccca Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 18 Sep 2024 11:20:25 +0200 Subject: [PATCH] Revert "feat(spooler): Implement backpressure mechanism based on atomics (#4040)" This reverts commit 0198a232f20e55eb06bf68a14d1dae9c3a948e09. --- relay-server/src/service.rs | 6 --- relay-server/src/services/buffer/mod.rs | 57 +--------------------- relay-server/src/services/project_cache.rs | 26 +++------- 3 files changed, 10 insertions(+), 79 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index cf46870ec2..289014f858 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -1,6 +1,5 @@ use std::convert::Infallible; use std::fmt; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; @@ -242,9 +241,6 @@ impl ServiceState { ) .spawn_handler(processor_rx); - // We initialize a shared boolean that is used to manage backpressure between the - // EnvelopeBufferService and the ProjectCacheService. - let project_cache_ready = Arc::new(AtomicBool::new(true)); let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), @@ -254,7 +250,6 @@ impl ServiceState { outcome_aggregator: outcome_aggregator.clone(), test_store: test_store.clone(), }, - project_cache_ready.clone(), ) .map(|b| b.start_observable()); @@ -277,7 +272,6 @@ impl ServiceState { redis_pools .as_ref() .map(|pools| pools.project_configs.clone()), - project_cache_ready, ) .spawn_handler(project_cache_rx); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index a88f5b650d..a21d7bd8c7 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -110,7 +110,6 @@ pub struct EnvelopeBufferService { services: Services, has_capacity: Arc, sleep: Duration, - project_cache_ready: Arc, } /// The maximum amount of time between evaluations of dequeue conditions. @@ -129,7 +128,6 @@ impl EnvelopeBufferService { memory_checker: MemoryChecker, global_config_rx: watch::Receiver, services: Services, - project_cache_ready: Arc, ) -> Option { config.spool_v2().then(|| Self { config, @@ -138,7 +136,6 @@ impl EnvelopeBufferService { services, has_capacity: Arc::new(AtomicBool::new(true)), sleep: Duration::ZERO, - project_cache_ready, }) } @@ -172,12 +169,6 @@ impl EnvelopeBufferService { tokio::time::sleep(self.sleep).await; } - // In case the project cache is not ready, we defer popping to first try and handle incoming - // messages and only come back to this in case within the timeout no data was received. - while !self.project_cache_ready.load(Ordering::Relaxed) { - tokio::time::sleep(Duration::from_millis(10)).await; - } - relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "slept" @@ -243,11 +234,7 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); - // We assume that the project cache is now busy to process this envelope, so we flip - // the boolean flag, which will prioritize writes. - self.project_cache_ready.store(false, Ordering::SeqCst); self.services.project_cache.send(DequeuedEnvelope(envelope)); - self.sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, envelope) => { @@ -452,7 +439,6 @@ mod tests { watch::Sender, mpsc::UnboundedReceiver, mpsc::UnboundedReceiver, - Arc, ) { let config = Arc::new( Config::from_json_value(serde_json::json!({ @@ -468,7 +454,6 @@ mod tests { let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom(); - let project_cache_ready = Arc::new(AtomicBool::new(true)); ( EnvelopeBufferService::new( config, @@ -479,20 +464,18 @@ mod tests { outcome_aggregator, test_store: Addr::dummy(), }, - project_cache_ready.clone(), ) .unwrap(), global_tx, project_cache_rx, outcome_aggregator_rx, - project_cache_ready, ) } #[tokio::test] async fn capacity_is_updated() { tokio::time::pause(); - let (service, _global_rx, _project_cache_tx, _, _) = buffer_service(); + let (service, _global_rx, _project_cache_tx, _) = buffer_service(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); @@ -514,7 +497,7 @@ mod tests { #[tokio::test] async fn pop_requires_global_config() { tokio::time::pause(); - let (service, global_tx, project_cache_rx, _, _) = buffer_service(); + let (service, global_tx, project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -563,7 +546,6 @@ mod tests { ))); let (project_cache, project_cache_rx) = Addr::custom(); - let project_cache_ready = Arc::new(AtomicBool::new(true)); let service = EnvelopeBufferService::new( config, memory_checker, @@ -573,7 +555,6 @@ mod tests { outcome_aggregator: Addr::dummy(), test_store: Addr::dummy(), }, - project_cache_ready, ) .unwrap(); let addr = service.start(); @@ -609,7 +590,6 @@ mod tests { let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); - let project_cache_ready = Arc::new(AtomicBool::new(true)); let service = EnvelopeBufferService::new( config, memory_checker, @@ -619,7 +599,6 @@ mod tests { outcome_aggregator, test_store: Addr::dummy(), }, - project_cache_ready, ) .unwrap(); @@ -644,36 +623,4 @@ mod tests { assert_eq!(outcome.category, DataCategory::TransactionIndexed); assert_eq!(outcome.quantity, 1); } - - #[tokio::test] - async fn output_is_throttled() { - tokio::time::pause(); - let (service, global_tx, mut project_cache_rx, _, _) = buffer_service(); - global_tx.send_replace(global_config::Status::Ready(Arc::new( - GlobalConfig::default(), - ))); - - let addr = service.start(); - - // Send five messages: - let envelope = new_envelope(false, "foo"); - let project_key = envelope.meta().public_key(); - for _ in 0..5 { - addr.send(EnvelopeBuffer::Push(envelope.clone())); - } - addr.send(EnvelopeBuffer::Ready(project_key)); - - tokio::time::sleep(Duration::from_millis(100)).await; - - let mut messages = vec![]; - project_cache_rx.recv_many(&mut messages, 100).await; - - assert_eq!( - messages - .iter() - .filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..))) - .count(), - 1 - ); - } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 2ce2920d06..3c6633207e 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -1,6 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; use std::error::Error; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -293,7 +292,7 @@ pub enum ProjectCache { UpdateSpoolIndex(UpdateSpoolIndex), SpoolHealth(Sender), RefreshIndexCache(RefreshIndexCache), - HandleDequeuedEnvelope(Box), + HandleDequeuedEnvelope(Box, Sender<()>), UpdateProject(ProjectKey), } @@ -312,7 +311,7 @@ impl ProjectCache { Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", Self::SpoolHealth(_) => "SpoolHealth", Self::RefreshIndexCache(_) => "RefreshIndexCache", - Self::HandleDequeuedEnvelope(_) => "HandleDequeuedEnvelope", + Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope", Self::UpdateProject(_) => "UpdateProject", } } @@ -420,11 +419,11 @@ impl FromMessage for ProjectCache { } impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; + type Response = relay_system::AsyncResponse<()>; - fn from_message(message: DequeuedEnvelope, _: ()) -> Self { + fn from_message(message: DequeuedEnvelope, sender: Sender<()>) -> Self { let DequeuedEnvelope(envelope) = message; - Self::HandleDequeuedEnvelope(envelope) + Self::HandleDequeuedEnvelope(envelope, sender) } } @@ -611,8 +610,6 @@ struct ProjectCacheBroker { spool_v1: Option, /// Status of the global configuration, used to determine readiness for processing. global_config: GlobalConfigStatus, - /// Atomic boolean signaling whether the project cache is ready to accept a new envelope. - project_cache_ready: Arc, } #[derive(Debug)] @@ -1308,7 +1305,7 @@ impl ProjectCacheBroker { ProjectCache::RefreshIndexCache(message) => { self.handle_refresh_index_cache(message) } - ProjectCache::HandleDequeuedEnvelope(message) => { + ProjectCache::HandleDequeuedEnvelope(message, sender) => { let envelope_buffer = self .services .envelope_buffer @@ -1321,9 +1318,8 @@ impl ProjectCacheBroker { "Failed to handle popped envelope" ); } - - // We mark the project cache as ready to accept new traffic. - self.project_cache_ready.store(true, Ordering::SeqCst); + // Return response to signal readiness for next envelope: + sender.send(()) } ProjectCache::UpdateProject(project) => self.handle_update_project(project), } @@ -1340,7 +1336,6 @@ pub struct ProjectCacheService { services: Services, global_config_rx: watch::Receiver, redis: Option, - project_cache_ready: Arc, } impl ProjectCacheService { @@ -1351,7 +1346,6 @@ impl ProjectCacheService { services: Services, global_config_rx: watch::Receiver, redis: Option, - project_cache_ready: Arc, ) -> Self { Self { config, @@ -1359,7 +1353,6 @@ impl ProjectCacheService { services, global_config_rx, redis, - project_cache_ready, } } } @@ -1374,7 +1367,6 @@ impl Service for ProjectCacheService { services, mut global_config_rx, redis, - project_cache_ready, } = self; let project_cache = services.project_cache.clone(); let outcome_aggregator = services.outcome_aggregator.clone(); @@ -1456,7 +1448,6 @@ impl Service for ProjectCacheService { spool_v1_unspool_handle: SleepHandle::idle(), spool_v1, global_config, - project_cache_ready, }; loop { @@ -1651,7 +1642,6 @@ mod tests { buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)), }), global_config: GlobalConfigStatus::Pending, - project_cache_ready: Arc::new(AtomicBool::new(true)), }, buffer, )