diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index a214b23837..02b411114c 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -5,17 +5,6 @@ use std::time::Duration; use crate::metrics::{MetricOutcomes, MetricStats}; use crate::services::buffer::{EnvelopeBufferService, ObservableEnvelopeBuffer}; -use crate::services::stats::RelayStats; -use anyhow::{Context, Result}; -use axum::extract::FromRequestParts; -use axum::http::request::Parts; -use rayon::ThreadPool; -use relay_cogs::Cogs; -use relay_config::{Config, RedisConnection, RedisPoolConfigs}; -use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; -use relay_system::{channel, Addr, Service}; -use tokio::runtime::Runtime; - use crate::services::cogs::{CogsService, CogsServiceRecorder}; use crate::services::global_config::{GlobalConfigManager, GlobalConfigService}; use crate::services::health_check::{HealthCheck, HealthCheckService}; @@ -25,11 +14,22 @@ use crate::services::outcome_aggregator::OutcomeAggregator; use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService}; use crate::services::project_cache::{ProjectCache, ProjectCacheService, Services}; use crate::services::relays::{RelayCache, RelayCacheService}; +use crate::services::stats::RelayStats; #[cfg(feature = "processing")] use crate::services::store::StoreService; use crate::services::test_store::{TestStore, TestStoreService}; use crate::services::upstream::{UpstreamRelay, UpstreamRelayService}; use crate::utils::{MemoryChecker, MemoryStat}; +use anyhow::{Context, Result}; +use axum::extract::FromRequestParts; +use axum::http::request::Parts; +use rayon::ThreadPool; +use relay_cogs::Cogs; +use relay_config::{Config, RedisConnection, RedisPoolConfigs}; +use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; +use relay_system::{channel, Addr, Service}; +use tokio::runtime::Runtime; +use tokio::sync::watch; /// Indicates the type of failure of the server. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)] @@ -240,10 +240,16 @@ impl ServiceState { ) .spawn_handler(processor_rx); + // We create a watch channel that is used to monitor availability of the project cache + // service for processing new envelopes. This is used as a simple backpressure mechanism + // between services. + let (envelope_processing_availability, _) = watch::channel(true); + let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache.clone(), + envelope_processing_availability.subscribe(), ) .map(|b| b.start_observable()); @@ -266,6 +272,7 @@ impl ServiceState { redis_pools .as_ref() .map(|pools| pools.project_configs.clone()), + envelope_processing_availability, ) .spawn_handler(project_cache_rx); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index ff60db921d..46dc3acc83 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -1,15 +1,15 @@ //! Types for buffering envelopes. -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::time::Duration; - use relay_base_schema::project::ProjectKey; use relay_config::Config; -use relay_system::Request; use relay_system::SendError; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; +use std::future; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::watch; use crate::envelope::Envelope; use crate::services::buffer::envelope_buffer::Peek; @@ -91,11 +91,11 @@ pub struct EnvelopeBufferService { memory_checker: MemoryChecker, project_cache: Addr, has_capacity: Arc, - sleep: Duration, - project_cache_ready: Option>, + pop_delay: Duration, + project_cache_envelope_processing_availability: watch::Receiver, } -const DEFAULT_SLEEP: Duration = Duration::from_millis(100); +const DEFAULT_POP_DELAY: Duration = Duration::from_millis(100); impl EnvelopeBufferService { /// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config. @@ -106,14 +106,15 @@ impl EnvelopeBufferService { config: Arc, memory_checker: MemoryChecker, project_cache: Addr, + project_cache_envelope_processing_availability: watch::Receiver, ) -> Option { config.spool_v2().then(|| Self { config, memory_checker, project_cache, has_capacity: Arc::new(AtomicBool::new(true)), - sleep: Duration::ZERO, - project_cache_ready: None, + pop_delay: Duration::ZERO, + project_cache_envelope_processing_availability, }) } @@ -128,11 +129,19 @@ impl EnvelopeBufferService { /// Wait for the configured amount of time and make sure the project cache is ready to receive. async fn ready_to_pop(&mut self) -> Result<(), SendError> { - tokio::time::sleep(self.sleep).await; - if let Some(project_cache_ready) = self.project_cache_ready.take() { - project_cache_ready.await?; + tokio::time::sleep(self.pop_delay).await; + + if let Ok(availability) = self + .project_cache_envelope_processing_availability + .wait_for(|availability| *availability) + .await + { + if *availability { + return Ok(()); + } } - Ok(()) + + future::pending().await } /// Tries to pop an envelope for a ready project. @@ -144,7 +153,8 @@ impl EnvelopeBufferService { match buffer.peek().await? { Peek::Empty => { relay_log::trace!("EnvelopeBufferService empty"); - self.sleep = Duration::MAX; // wait for reset by `handle_message`. + // Wait for reset by `handle_message`. + self.pop_delay = Duration::MAX; } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService pop"); @@ -153,9 +163,9 @@ impl EnvelopeBufferService { .await? .expect("Element disappeared despite exclusive excess"); - self.project_cache_ready - .replace(self.project_cache.send(DequeuedEnvelope(envelope))); - self.sleep = Duration::ZERO; // try next pop immediately + self.project_cache.send(DequeuedEnvelope(envelope)); + // Try next pop immediately. + self.pop_delay = Duration::ZERO; } Peek::NotReady(stack_key, envelope) => { relay_log::trace!("EnvelopeBufferService request update"); @@ -168,9 +178,9 @@ impl EnvelopeBufferService { self.project_cache.send(UpdateProject(sampling_key)); } } - // deprioritize the stack to prevent head-of-line blocking + // Deprioritize the stack to prevent head-of-line blocking. buffer.mark_seen(&stack_key); - self.sleep = DEFAULT_SLEEP; + self.pop_delay = DEFAULT_POP_DELAY; } } @@ -202,7 +212,7 @@ impl EnvelopeBufferService { buffer.mark_ready(&project_key, true); } }; - self.sleep = Duration::ZERO; + self.pop_delay = Duration::ZERO; } /// Pushes an [`Envelope`] to the [`PolymorphicEnvelopeBuffer`]. @@ -235,6 +245,8 @@ impl Service for EnvelopeBufferService { let config = self.config.clone(); let memory_checker = self.memory_checker.clone(); tokio::spawn(async move { + let mut ticker = tokio::time::interval(Duration::from_millis(100)); + let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; let mut buffer = match buffer { @@ -249,15 +261,14 @@ impl Service for EnvelopeBufferService { }; buffer.initialize().await; - let mut ticker = tokio::time::interval(Duration::from_millis(100)); - relay_log::info!("EnvelopeBufferService start"); loop { relay_log::trace!("EnvelopeBufferService loop"); tokio::select! { // NOTE: we do not select a bias here. - // On the one hand, we might want to prioritize dequeing over enqueing + // + // On the one hand, we might want to prioritize dequeuing over enqueuing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. Ok(()) = self.ready_to_pop() => { @@ -308,7 +319,13 @@ mod tests { .unwrap(), ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - let service = EnvelopeBufferService::new(config, memory_checker, Addr::dummy()).unwrap(); + let service = EnvelopeBufferService::new( + config, + memory_checker, + Addr::dummy(), + watch::channel(true).0.subscribe(), + ) + .unwrap(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); @@ -342,24 +359,47 @@ mod tests { ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (project_cache_addr, mut project_cache_rx) = Addr::custom(); - let service = - EnvelopeBufferService::new(config, memory_checker, project_cache_addr).unwrap(); + let (envelope_processing_availability, _) = watch::channel(true); + let service = EnvelopeBufferService::new( + config, + memory_checker, + project_cache_addr, + envelope_processing_availability.subscribe(), + ) + .unwrap(); let addr = service.start(); - // Send five messages: + // Send one message with availability set to true. let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); + addr.send(EnvelopeBuffer::Push(envelope.clone())); + + // We wait for the message to be processed. + tokio::time::sleep(Duration::from_millis(100)).await; + + // We simulate the availability set to false. + envelope_processing_availability.send(false).unwrap(); + + // We send five more messages. for _ in 0..5 { addr.send(EnvelopeBuffer::Push(envelope.clone())); } addr.send(EnvelopeBuffer::Ready(project_key)); - tokio::time::sleep(Duration::from_millis(1000)).await; - - // Project cache received only one envelope: - assert_eq!(project_cache_rx.len(), 1); // without throttling, this would be 5. - assert!(project_cache_rx.try_recv().is_ok()); + // We expect the project cache to have received only one envelope since the other five + // have been blocked on the check. + assert_eq!(project_cache_rx.len(), 1); + assert!(project_cache_rx.recv().await.is_some()); assert_eq!(project_cache_rx.len(), 0); + + // We set the availability back to true. + envelope_processing_availability.send(true).unwrap(); + + // We wait for all the envelopes to be processed. + tokio::time::sleep(Duration::from_millis(100)).await; + + // We expect to have received all five of them. + assert_eq!(project_cache_rx.len(), 5); } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 507833ce78..d5903bfd1c 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -19,9 +19,9 @@ use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Sender, Service}; -use tokio::sync::mpsc; #[cfg(feature = "processing")] use tokio::sync::Semaphore; +use tokio::sync::{mpsc, watch}; use tokio::time::Instant; use crate::services::global_config::{self, GlobalConfigManager, Subscribe}; @@ -292,7 +292,7 @@ pub enum ProjectCache { UpdateSpoolIndex(UpdateSpoolIndex), SpoolHealth(Sender), RefreshIndexCache(RefreshIndexCache), - HandleDequeuedEnvelope(Box, Sender<()>), + HandleDequeuedEnvelope(Box), UpdateProject(ProjectKey), } @@ -311,7 +311,7 @@ impl ProjectCache { Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", Self::SpoolHealth(_) => "SpoolHealth", Self::RefreshIndexCache(_) => "RefreshIndexCache", - Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope", + Self::HandleDequeuedEnvelope(_) => "HandleDequeuedEnvelope", Self::UpdateProject(_) => "UpdateProject", } } @@ -419,11 +419,11 @@ impl FromMessage for ProjectCache { } impl FromMessage for ProjectCache { - type Response = relay_system::AsyncResponse<()>; + type Response = relay_system::NoResponse; - fn from_message(message: DequeuedEnvelope, sender: Sender<()>) -> Self { + fn from_message(message: DequeuedEnvelope, _: ()) -> Self { let DequeuedEnvelope(envelope) = message; - Self::HandleDequeuedEnvelope(envelope, sender) + Self::HandleDequeuedEnvelope(envelope) } } @@ -1316,7 +1316,7 @@ impl ProjectCacheBroker { ProjectCache::RefreshIndexCache(message) => { self.handle_refresh_index_cache(message) } - ProjectCache::HandleDequeuedEnvelope(message, sender) => { + ProjectCache::HandleDequeuedEnvelope(message) => { let envelope_buffer = self .services .envelope_buffer @@ -1329,8 +1329,6 @@ impl ProjectCacheBroker { "Failed to handle popped envelope" ); } - // Return response to signal readiness for next envelope: - sender.send(()) } ProjectCache::UpdateProject(project) => self.handle_update_project(project), } @@ -1346,6 +1344,8 @@ pub struct ProjectCacheService { memory_checker: MemoryChecker, services: Services, redis: Option, + /// Watch channel used to expose availability to process new envelopes in the project cache. + envelope_processing_availability: watch::Sender, } impl ProjectCacheService { @@ -1355,12 +1355,14 @@ impl ProjectCacheService { memory_checker: MemoryChecker, services: Services, redis: Option, + envelope_processing_availability: watch::Sender, ) -> Self { Self { config, memory_checker, services, redis, + envelope_processing_availability, } } } @@ -1374,10 +1376,12 @@ impl Service for ProjectCacheService { memory_checker, services, redis, + ref envelope_processing_availability, } = self; let project_cache = services.project_cache.clone(); let outcome_aggregator = services.outcome_aggregator.clone(); let test_store = services.test_store.clone(); + let envelope_processing_availability = envelope_processing_availability.clone(); tokio::spawn(async move { let mut ticker = tokio::time::interval(config.cache_eviction_interval()); @@ -1466,6 +1470,10 @@ impl Service for ProjectCacheService { }; loop { + // We notify all watchers that the project cache service is now busy, so that other + // services can stop pushing data to the incoming channel. + let _ = envelope_processing_availability.send(false); + tokio::select! { biased; @@ -1510,6 +1518,9 @@ impl Service for ProjectCacheService { } } + // We notify all watchers that they can resume sending messages to the project cache. + let _ = envelope_processing_availability.send(true); + relay_log::info!("project cache stopped"); }); }