From e44a20fbd50af4431a1a04d552f18e30f246fcf5 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 20 Sep 2024 09:15:57 +0200 Subject: [PATCH 1/9] feat(spooler): Implement backpressure in spooler via bounded queues --- relay-server/src/service.rs | 26 ++++---- relay-server/src/services/buffer/mod.rs | 75 +++++++++++++--------- relay-server/src/services/project_cache.rs | 24 ++++--- 3 files changed, 74 insertions(+), 51 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 289014f858..3b191a3dab 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -5,17 +5,6 @@ use std::time::Duration; use crate::metrics::{MetricOutcomes, MetricStats}; use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer}; -use crate::services::stats::RelayStats; -use anyhow::{Context, Result}; -use axum::extract::FromRequestParts; -use axum::http::request::Parts; -use rayon::ThreadPool; -use relay_cogs::Cogs; -use relay_config::{Config, RedisConnection, RedisPoolConfigs}; -use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; -use relay_system::{channel, Addr, Service}; -use tokio::runtime::Runtime; - use crate::services::cogs::{CogsService, CogsServiceRecorder}; use crate::services::global_config::{GlobalConfigManager, GlobalConfigService}; use crate::services::health_check::{HealthCheck, HealthCheckService}; @@ -25,11 +14,22 @@ use crate::services::outcome_aggregator::OutcomeAggregator; use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService}; use crate::services::project_cache::{ProjectCache, ProjectCacheService, Services}; use crate::services::relays::{RelayCache, RelayCacheService}; +use crate::services::stats::RelayStats; #[cfg(feature = "processing")] use crate::services::store::StoreService; use crate::services::test_store::{TestStore, TestStoreService}; use crate::services::upstream::{UpstreamRelay, UpstreamRelayService}; use crate::utils::{MemoryChecker, MemoryStat}; +use anyhow::{Context, Result}; +use axum::extract::FromRequestParts; +use axum::http::request::Parts; +use rayon::ThreadPool; +use relay_cogs::Cogs; +use relay_config::{Config, RedisConnection, RedisPoolConfigs}; +use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; +use relay_system::{channel, Addr, Service}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; /// Indicates the type of failure of the server. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)] @@ -241,12 +241,13 @@ impl ServiceState { ) .spawn_handler(processor_rx); + let (project_cache_bounded_tx, project_cache_bounded_rx) = mpsc::channel(500); let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), global_config_rx.clone(), buffer::Services { - project_cache: project_cache.clone(), + project_cache: project_cache_bounded_tx, outcome_aggregator: outcome_aggregator.clone(), test_store: test_store.clone(), }, @@ -269,6 +270,7 @@ impl ServiceState { MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache_services, global_config_rx, + project_cache_bounded_rx, redis_pools .as_ref() .map(|pools| pools.project_configs.clone()), diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index cfdccda21d..040673842b 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -8,10 +8,9 @@ use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; -use relay_system::SendError; -use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; +use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, SendError, Service}; use relay_system::{Controller, Shutdown}; -use tokio::sync::watch; +use tokio::sync::{mpsc, watch}; use tokio::time::{timeout, Instant}; use crate::envelope::Envelope; @@ -21,9 +20,8 @@ use crate::services::outcome::DiscardReason; use crate::services::outcome::Outcome; use crate::services::outcome::TrackOutcome; use crate::services::processor::ProcessingGroup; -use crate::services::project_cache::DequeuedEnvelope; use crate::services::project_cache::ProjectCache; -use crate::services::project_cache::UpdateProject; + use crate::services::test_store::TestStore; use crate::statsd::RelayCounters; use crate::utils::ManagedEnvelope; @@ -96,7 +94,7 @@ impl ObservableEnvelopeBuffer { /// Services that the buffer service communicates with. pub struct Services { - pub project_cache: Addr, + pub project_cache: mpsc::Sender, pub outcome_aggregator: Addr, pub test_store: Addr, } @@ -149,10 +147,7 @@ impl EnvelopeBufferService { } /// Wait for the configured amount of time and make sure the project cache is ready to receive. - async fn ready_to_pop( - &mut self, - buffer: &mut PolymorphicEnvelopeBuffer, - ) -> Result<(), SendError> { + async fn ready_to_pop(&mut self, buffer: &PolymorphicEnvelopeBuffer) -> Result<(), SendError> { relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checking" @@ -169,14 +164,13 @@ impl EnvelopeBufferService { tokio::time::sleep(self.sleep).await; } - relay_statsd::metric!( - counter(RelayCounters::BufferReadyToPop) += 1, - status = "slept" - ); + while self.services.project_cache.capacity() == 0 { + tokio::time::sleep(Duration::from_millis(1)).await; + } relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, - status = "checked" + status = "acquired" ); Ok(()) @@ -234,7 +228,17 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); - self.services.project_cache.send(DequeuedEnvelope(envelope)); + if let Err(error) = self + .services + .project_cache + .send(ProjectCache::HandleDequeuedEnvelope(envelope)) + .await + { + relay_log::error!( + error = &error as &dyn Error, + "the envelope buffer couldn't send an envelope to the project cache", + ); + }; self.sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, next_project_fetch, envelope) => { @@ -249,15 +253,23 @@ impl EnvelopeBufferService { // avoid flooding the project cache with `UpdateProject` messages. if Instant::now() >= next_project_fetch { relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); - let project_key = envelope.meta().public_key(); - self.services.project_cache.send(UpdateProject(project_key)); + let own_key = envelope.meta().public_key(); + + // TODO: do we want to handle an error? + let _ = self + .services + .project_cache + .send(ProjectCache::UpdateProject(own_key)) + .await; match envelope.sampling_key() { None => {} - Some(sampling_key) if sampling_key == project_key => {} // already sent. + Some(sampling_key) if sampling_key == own_key => {} // already sent. Some(sampling_key) => { - self.services + let _ = self + .services .project_cache - .send(UpdateProject(sampling_key)); + .send(ProjectCache::UpdateProject(sampling_key)) + .await; } } @@ -397,7 +409,7 @@ impl Service for EnvelopeBufferService { // On the one hand, we might want to prioritize dequeuing over enqueuing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - Ok(()) = self.ready_to_pop(&mut buffer) => { + Ok(_) = self.ready_to_pop(&buffer) => { if let Err(e) = self.try_pop(&mut buffer).await { relay_log::error!( error = &e as &dyn std::error::Error, @@ -447,7 +459,7 @@ mod tests { fn buffer_service() -> ( EnvelopeBufferService, watch::Sender, - mpsc::UnboundedReceiver, + mpsc::Receiver, mpsc::UnboundedReceiver, ) { let config = Arc::new( @@ -462,7 +474,7 @@ mod tests { ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); - let (project_cache, project_cache_rx) = Addr::custom(); + let (project_cache, project_cache_rx) = mpsc::channel(5); let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom(); ( EnvelopeBufferService::new( @@ -470,7 +482,7 @@ mod tests { memory_checker, global_rx, Services { - project_cache, + project_cache: project_cache, outcome_aggregator, test_store: Addr::dummy(), }, @@ -506,8 +518,9 @@ mod tests { #[tokio::test] async fn pop_requires_global_config() { + relay_log::init_test!(); tokio::time::pause(); - let (service, global_tx, project_cache_rx, _) = buffer_service(); + let (service, global_tx, mut project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -555,13 +568,13 @@ mod tests { GlobalConfig::default(), ))); - let (project_cache, project_cache_rx) = Addr::custom(); + let (project_cache, project_cache_rx) = mpsc::channel(20); let service = EnvelopeBufferService::new( config, memory_checker, global_rx, Services { - project_cache, + project_cache: project_cache, outcome_aggregator: Addr::dummy(), test_store: Addr::dummy(), }, @@ -598,14 +611,14 @@ mod tests { ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); - let (project_cache, project_cache_rx) = Addr::custom(); + let (project_cache, project_cache_rx) = mpsc::channel(20); let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); let service = EnvelopeBufferService::new( config, memory_checker, global_rx, Services { - project_cache, + project_cache: project_cache, outcome_aggregator, test_store: Addr::dummy(), }, @@ -653,7 +666,7 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; // We expect the project update request to be sent. - let Some(ProjectCache::HandleDequeuedEnvelope(envelope, _)) = project_cache_rx.recv().await + let Some(ProjectCache::HandleDequeuedEnvelope(envelope)) = project_cache_rx.recv().await else { panic!(); }; diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index be595f340a..4a04342578 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -295,7 +295,7 @@ pub enum ProjectCache { UpdateSpoolIndex(UpdateSpoolIndex), SpoolHealth(Sender), RefreshIndexCache(RefreshIndexCache), - HandleDequeuedEnvelope(Box, Sender<()>), + HandleDequeuedEnvelope(Box), UpdateProject(ProjectKey), } @@ -314,7 +314,7 @@ impl ProjectCache { Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", Self::SpoolHealth(_) => "SpoolHealth", Self::RefreshIndexCache(_) => "RefreshIndexCache", - Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope", + Self::HandleDequeuedEnvelope(_) => "HandleDequeuedEnvelope", Self::UpdateProject(_) => "UpdateProject", } } @@ -422,11 +422,11 @@ impl FromMessage for ProjectCache { } impl FromMessage for ProjectCache { - type Response = relay_system::AsyncResponse<()>; + type Response = relay_system::NoResponse; - fn from_message(message: DequeuedEnvelope, sender: Sender<()>) -> Self { + fn from_message(message: DequeuedEnvelope, _: ()) -> Self { let DequeuedEnvelope(envelope) = message; - Self::HandleDequeuedEnvelope(envelope, sender) + Self::HandleDequeuedEnvelope(envelope) } } @@ -1308,7 +1308,7 @@ impl ProjectCacheBroker { ProjectCache::RefreshIndexCache(message) => { self.handle_refresh_index_cache(message) } - ProjectCache::HandleDequeuedEnvelope(message, sender) => { + ProjectCache::HandleDequeuedEnvelope(message) => { let envelope_buffer = self .services .envelope_buffer @@ -1321,8 +1321,6 @@ impl ProjectCacheBroker { "Failed to handle popped envelope" ); } - // Return response to signal readiness for next envelope: - sender.send(()) } ProjectCache::UpdateProject(project) => self.handle_update_project(project), } @@ -1338,6 +1336,7 @@ pub struct ProjectCacheService { memory_checker: MemoryChecker, services: Services, global_config_rx: watch::Receiver, + project_cache_bounded_rx: mpsc::Receiver, redis: Option, } @@ -1348,6 +1347,7 @@ impl ProjectCacheService { memory_checker: MemoryChecker, services: Services, global_config_rx: watch::Receiver, + project_cache_bounded_rx: mpsc::Receiver, redis: Option, ) -> Self { Self { @@ -1355,6 +1355,7 @@ impl ProjectCacheService { memory_checker, services, global_config_rx, + project_cache_bounded_rx, redis, } } @@ -1369,6 +1370,7 @@ impl Service for ProjectCacheService { memory_checker, services, mut global_config_rx, + mut project_cache_bounded_rx, redis, } = self; let project_cache = services.project_cache.clone(); @@ -1489,6 +1491,12 @@ impl Service for ProjectCacheService { broker.handle_periodic_unspool() }) } + // TODO: this prioritization might stab us in the back. + Some(message) = project_cache_bounded_rx.recv() => { + metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_message_from_bounded", { + broker.handle_message(message) + }) + } Some(message) = rx.recv() => { metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_message", { broker.handle_message(message) From 67b556d70875b05206305c628c4473fe27cb6f92 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 20 Sep 2024 09:24:26 +0200 Subject: [PATCH 2/9] Update --- relay-server/src/service.rs | 21 +++++++++++---------- relay-server/src/services/buffer/mod.rs | 7 ++++++- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 3b191a3dab..43ea78e6ec 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -3,6 +3,17 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; +use anyhow::{Context, Result}; +use axum::extract::FromRequestParts; +use axum::http::request::Parts; +use rayon::ThreadPool; +use relay_cogs::Cogs; +use relay_config::{Config, RedisConnection, RedisPoolConfigs}; +use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; +use relay_system::{channel, Addr, Service}; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; + use crate::metrics::{MetricOutcomes, MetricStats}; use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer}; use crate::services::cogs::{CogsService, CogsServiceRecorder}; @@ -20,16 +31,6 @@ use crate::services::store::StoreService; use crate::services::test_store::{TestStore, TestStoreService}; use crate::services::upstream::{UpstreamRelay, UpstreamRelayService}; use crate::utils::{MemoryChecker, MemoryStat}; -use anyhow::{Context, Result}; -use axum::extract::FromRequestParts; -use axum::http::request::Parts; -use rayon::ThreadPool; -use relay_cogs::Cogs; -use relay_config::{Config, RedisConnection, RedisPoolConfigs}; -use relay_redis::{RedisConfigOptions, RedisError, RedisPool, RedisPools}; -use relay_system::{channel, Addr, Service}; -use tokio::runtime::Runtime; -use tokio::sync::mpsc; /// Indicates the type of failure of the server. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)] diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 040673842b..8c33ef7bd2 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -164,13 +164,18 @@ impl EnvelopeBufferService { tokio::time::sleep(self.sleep).await; } + relay_statsd::metric!( + counter(RelayCounters::BufferReadyToPop) += 1, + status = "slept" + ); + while self.services.project_cache.capacity() == 0 { tokio::time::sleep(Duration::from_millis(1)).await; } relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, - status = "acquired" + status = "checked" ); Ok(()) From 1a371344d83cd0e59fb22c6f841c529031572f3b Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 20 Sep 2024 09:27:37 +0200 Subject: [PATCH 3/9] Update --- relay-server/src/services/buffer/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 8c33ef7bd2..96ea4fc952 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -487,7 +487,7 @@ mod tests { memory_checker, global_rx, Services { - project_cache: project_cache, + project_cache, outcome_aggregator, test_store: Addr::dummy(), }, @@ -525,7 +525,7 @@ mod tests { async fn pop_requires_global_config() { relay_log::init_test!(); tokio::time::pause(); - let (service, global_tx, mut project_cache_rx, _) = buffer_service(); + let (service, global_tx, project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -579,7 +579,7 @@ mod tests { memory_checker, global_rx, Services { - project_cache: project_cache, + project_cache, outcome_aggregator: Addr::dummy(), test_store: Addr::dummy(), }, @@ -623,7 +623,7 @@ mod tests { memory_checker, global_rx, Services { - project_cache: project_cache, + project_cache, outcome_aggregator, test_store: Addr::dummy(), }, From 1859dafd5f4a7fbcb56d878d3245a586f06fdb74 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 20 Sep 2024 10:41:27 +0200 Subject: [PATCH 4/9] Fix --- relay-server/src/service.rs | 7 +- relay-server/src/services/buffer/mod.rs | 164 +++++++++++---------- relay-server/src/services/project_cache.rs | 55 +++---- 3 files changed, 113 insertions(+), 113 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 43ea78e6ec..5a1159726c 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -242,13 +242,14 @@ impl ServiceState { ) .spawn_handler(processor_rx); - let (project_cache_bounded_tx, project_cache_bounded_rx) = mpsc::channel(500); + let (envelopes_tx, envelopes_rx) = mpsc::channel(500); let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), global_config_rx.clone(), buffer::Services { - project_cache: project_cache_bounded_tx, + envelopes_tx, + project_cache: project_cache.clone(), outcome_aggregator: outcome_aggregator.clone(), test_store: test_store.clone(), }, @@ -271,7 +272,7 @@ impl ServiceState { MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache_services, global_config_rx, - project_cache_bounded_rx, + envelopes_rx, redis_pools .as_ref() .map(|pools| pools.project_configs.clone()), diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 96ea4fc952..4d4c12f9d0 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -8,8 +8,9 @@ use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; -use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, SendError, Service}; +use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; use relay_system::{Controller, Shutdown}; +use tokio::sync::mpsc::Permit; use tokio::sync::{mpsc, watch}; use tokio::time::{timeout, Instant}; @@ -20,7 +21,7 @@ use crate::services::outcome::DiscardReason; use crate::services::outcome::Outcome; use crate::services::outcome::TrackOutcome; use crate::services::processor::ProcessingGroup; -use crate::services::project_cache::ProjectCache; +use crate::services::project_cache::{DequeuedEnvelope, ProjectCache, UpdateProject}; use crate::services::test_store::TestStore; use crate::statsd::RelayCounters; @@ -93,8 +94,12 @@ impl ObservableEnvelopeBuffer { } /// Services that the buffer service communicates with. +#[derive(Clone)] pub struct Services { - pub project_cache: mpsc::Sender, + /// Bounded channel used exclusively to handle backpressure when sending envelopes to the + /// project cache. + pub envelopes_tx: mpsc::Sender, + pub project_cache: Addr, pub outcome_aggregator: Addr, pub test_store: Addr, } @@ -147,7 +152,10 @@ impl EnvelopeBufferService { } /// Wait for the configured amount of time and make sure the project cache is ready to receive. - async fn ready_to_pop(&mut self, buffer: &PolymorphicEnvelopeBuffer) -> Result<(), SendError> { + async fn ready_to_pop( + &mut self, + buffer: &PolymorphicEnvelopeBuffer, + ) -> Option> { relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checking" @@ -169,16 +177,14 @@ impl EnvelopeBufferService { status = "slept" ); - while self.services.project_cache.capacity() == 0 { - tokio::time::sleep(Duration::from_millis(1)).await; - } + let permit = self.services.envelopes_tx.reserve().await.ok(); relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checked" ); - Ok(()) + permit } /// Waits until preconditions for unspooling are met. @@ -202,11 +208,15 @@ impl EnvelopeBufferService { } /// Tries to pop an envelope for a ready project. - async fn try_pop( - &mut self, + async fn try_pop<'a>( + config: &Config, buffer: &mut PolymorphicEnvelopeBuffer, - ) -> Result<(), EnvelopeBufferError> { + services: Services, + envelopes_tx_permit: Permit<'a, DequeuedEnvelope>, + ) -> Result { + let mut sleep = Duration::ZERO; relay_log::trace!("EnvelopeBufferService: peeking the buffer"); + match buffer.peek().await? { Peek::Empty => { relay_log::trace!("EnvelopeBufferService: peek returned empty"); @@ -214,14 +224,18 @@ impl EnvelopeBufferService { counter(RelayCounters::BufferTryPop) += 1, peek_result = "empty" ); - self.sleep = Duration::MAX; // wait for reset by `handle_message`. + + sleep = Duration::MAX; // wait for reset by `handle_message`. } - Peek::Ready(envelope) | Peek::NotReady(.., envelope) if self.expired(envelope) => { + Peek::Ready(envelope) | Peek::NotReady(.., envelope) + if Self::expired(config, envelope) => + { let envelope = buffer .pop() .await? .expect("Element disappeared despite exclusive excess"); - self.drop_expired(envelope); + + Self::drop_expired(envelope, services); } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService: popping envelope"); @@ -233,18 +247,9 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); - if let Err(error) = self - .services - .project_cache - .send(ProjectCache::HandleDequeuedEnvelope(envelope)) - .await - { - relay_log::error!( - error = &error as &dyn Error, - "the envelope buffer couldn't send an envelope to the project cache", - ); - }; - self.sleep = Duration::ZERO; // try next pop immediately + envelopes_tx_permit.send(DequeuedEnvelope(envelope)); + + sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, next_project_fetch, envelope) => { relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready"); @@ -260,21 +265,12 @@ impl EnvelopeBufferService { relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); let own_key = envelope.meta().public_key(); - // TODO: do we want to handle an error? - let _ = self - .services - .project_cache - .send(ProjectCache::UpdateProject(own_key)) - .await; + services.project_cache.send(UpdateProject(own_key)); match envelope.sampling_key() { None => {} Some(sampling_key) if sampling_key == own_key => {} // already sent. Some(sampling_key) => { - let _ = self - .services - .project_cache - .send(ProjectCache::UpdateProject(sampling_key)) - .await; + services.project_cache.send(UpdateProject(sampling_key)); } } @@ -283,32 +279,28 @@ impl EnvelopeBufferService { buffer.mark_seen(&stack_key, DEFAULT_SLEEP); } - self.sleep = DEFAULT_SLEEP; + sleep = DEFAULT_SLEEP; } } - Ok(()) + Ok(sleep) } - fn expired(&self, envelope: &Envelope) -> bool { - envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age() + fn expired(config: &Config, envelope: &Envelope) -> bool { + envelope.meta().start_time().elapsed() > config.spool_envelopes_max_age() } - fn drop_expired(&self, envelope: Box) { + fn drop_expired(envelope: Box, services: Services) { let mut managed_envelope = ManagedEnvelope::new( envelope, - self.services.outcome_aggregator.clone(), - self.services.test_store.clone(), + services.outcome_aggregator.clone(), + services.test_store.clone(), ProcessingGroup::Ungrouped, ); managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp)); } - async fn handle_message( - &mut self, - buffer: &mut PolymorphicEnvelopeBuffer, - message: EnvelopeBuffer, - ) { + async fn handle_message(buffer: &mut PolymorphicEnvelopeBuffer, message: EnvelopeBuffer) { match message { EnvelopeBuffer::Push(envelope) => { // NOTE: This function assumes that a project state update for the relevant @@ -316,7 +308,7 @@ impl EnvelopeBufferService { // For better separation of concerns, this prefetch should be triggered from here // once buffer V1 has been removed. relay_log::trace!("EnvelopeBufferService: received push message"); - self.push(buffer, envelope).await; + Self::push(buffer, envelope).await; } EnvelopeBuffer::NotReady(project_key, envelope) => { relay_log::trace!( @@ -324,7 +316,7 @@ impl EnvelopeBufferService { &project_key ); relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1); - self.push(buffer, envelope).await; + Self::push(buffer, envelope).await; buffer.mark_ready(&project_key, false); } EnvelopeBuffer::Ready(project_key) => { @@ -335,14 +327,9 @@ impl EnvelopeBufferService { buffer.mark_ready(&project_key, true); } }; - self.sleep = Duration::ZERO; } - async fn handle_shutdown( - &mut self, - buffer: &mut PolymorphicEnvelopeBuffer, - message: Shutdown, - ) -> bool { + async fn handle_shutdown(buffer: &mut PolymorphicEnvelopeBuffer, message: Shutdown) -> bool { // We gracefully shut down only if the shutdown has a timeout. if let Some(shutdown_timeout) = message.timeout { relay_log::trace!("EnvelopeBufferService: shutting down gracefully"); @@ -364,7 +351,7 @@ impl EnvelopeBufferService { false } - async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { + async fn push(buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { if let Err(e) = buffer.push(envelope).await { relay_log::error!( error = &e as &dyn std::error::Error, @@ -386,6 +373,7 @@ impl Service for EnvelopeBufferService { let config = self.config.clone(); let memory_checker = self.memory_checker.clone(); let mut global_config_rx = self.global_config_rx.clone(); + let services = self.services.clone(); tokio::spawn(async move { let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; @@ -409,36 +397,44 @@ impl Service for EnvelopeBufferService { iteration += 1; relay_log::trace!("EnvelopeBufferService: loop iteration {iteration}"); + let mut sleep = Duration::MAX; tokio::select! { // NOTE: we do not select a bias here. // On the one hand, we might want to prioritize dequeuing over enqueuing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - Ok(_) = self.ready_to_pop(&buffer) => { - if let Err(e) = self.try_pop(&mut buffer).await { - relay_log::error!( - error = &e as &dyn std::error::Error, + Some(permit) = self.ready_to_pop(&buffer) => { + match Self::try_pop(&config, &mut buffer, services.clone(), permit).await { + Ok(new_sleep) => { + sleep = new_sleep; + } + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, "failed to pop envelope" ); + } } } Some(message) = rx.recv() => { - self.handle_message(&mut buffer, message).await; + Self::handle_message(&mut buffer, message).await; + sleep = Duration::ZERO; } shutdown = shutdown.notified() => { // In case the shutdown was handled, we break out of the loop signaling that // there is no need to process anymore envelopes. - if self.handle_shutdown(&mut buffer, shutdown).await { + if Self::handle_shutdown(&mut buffer, shutdown).await { break; } } _ = global_config_rx.changed() => { relay_log::trace!("EnvelopeBufferService: received global config"); - self.sleep = Duration::ZERO; // Try to pop + sleep = Duration::ZERO; // Try to pop } else => break, } + self.sleep = sleep; self.update_observable_state(&mut buffer); } @@ -464,7 +460,8 @@ mod tests { fn buffer_service() -> ( EnvelopeBufferService, watch::Sender, - mpsc::Receiver, + mpsc::Receiver, + mpsc::UnboundedReceiver, mpsc::UnboundedReceiver, ) { let config = Arc::new( @@ -479,7 +476,8 @@ mod tests { ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); - let (project_cache, project_cache_rx) = mpsc::channel(5); + let (envelopes_tx, envelopes_rx) = mpsc::channel(5); + let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom(); ( EnvelopeBufferService::new( @@ -487,6 +485,7 @@ mod tests { memory_checker, global_rx, Services { + envelopes_tx, project_cache, outcome_aggregator, test_store: Addr::dummy(), @@ -494,6 +493,7 @@ mod tests { ) .unwrap(), global_tx, + envelopes_rx, project_cache_rx, outcome_aggregator_rx, ) @@ -502,7 +502,7 @@ mod tests { #[tokio::test] async fn capacity_is_updated() { tokio::time::pause(); - let (service, _global_rx, _project_cache_tx, _) = buffer_service(); + let (service, _global_tx, _envelopes_rx, _project_cache_tx, _) = buffer_service(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); @@ -525,7 +525,7 @@ mod tests { async fn pop_requires_global_config() { relay_log::init_test!(); tokio::time::pause(); - let (service, global_tx, project_cache_rx, _) = buffer_service(); + let (service, global_tx, envelopes_rx, project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -538,6 +538,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; // Nothing was dequeued, global config not ready: + assert_eq!(envelopes_rx.len(), 0); assert_eq!(project_cache_rx.len(), 0); global_tx.send_replace(global_config::Status::Ready(Arc::new( @@ -547,7 +548,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; // Dequeued, global config ready: - assert_eq!(project_cache_rx.len(), 1); + assert_eq!(envelopes_rx.len(), 1); + assert_eq!(project_cache_rx.len(), 0); } #[tokio::test] @@ -573,12 +575,14 @@ mod tests { GlobalConfig::default(), ))); - let (project_cache, project_cache_rx) = mpsc::channel(20); + let (envelopes_tx, envelopes_rx) = mpsc::channel(5); + let (project_cache, project_cache_rx) = Addr::custom(); let service = EnvelopeBufferService::new( config, memory_checker, global_rx, Services { + envelopes_tx, project_cache, outcome_aggregator: Addr::dummy(), test_store: Addr::dummy(), @@ -596,6 +600,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; // Nothing was dequeued, memory not ready: + assert_eq!(envelopes_rx.len(), 0); assert_eq!(project_cache_rx.len(), 0); } @@ -616,13 +621,15 @@ mod tests { ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); - let (project_cache, project_cache_rx) = mpsc::channel(20); + let (envelopes_tx, envelopes_rx) = mpsc::channel(5); + let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); let service = EnvelopeBufferService::new( config, memory_checker, global_rx, Services { + envelopes_tx, project_cache, outcome_aggregator, test_store: Addr::dummy(), @@ -646,7 +653,9 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; - assert!(project_cache_rx.is_empty()); + assert_eq!(envelopes_rx.len(), 0); + assert_eq!(project_cache_rx.len(), 0); + let outcome = outcome_aggregator_rx.try_recv().unwrap(); assert_eq!(outcome.category, DataCategory::TransactionIndexed); assert_eq!(outcome.quantity, 1); @@ -655,7 +664,7 @@ mod tests { #[tokio::test] async fn test_update_project() { tokio::time::pause(); - let (service, global_tx, mut project_cache_rx, _) = buffer_service(); + let (service, global_tx, mut envelopes_rx, mut project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -670,9 +679,8 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; - // We expect the project update request to be sent. - let Some(ProjectCache::HandleDequeuedEnvelope(envelope)) = project_cache_rx.recv().await - else { + // We expect the envelope to be forwarded because by default we mark the project as ready. + let Some(DequeuedEnvelope(envelope)) = envelopes_rx.recv().await else { panic!(); }; @@ -680,6 +688,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; + // We expect the project update request to be sent. assert_eq!(project_cache_rx.len(), 1); let message = project_cache_rx.recv().await; assert!(matches!( @@ -689,6 +698,7 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; + // We expect the project update request to be sent. assert_eq!(project_cache_rx.len(), 1); assert!(matches!( message, diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 4a04342578..6c507e5f18 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -295,7 +295,6 @@ pub enum ProjectCache { UpdateSpoolIndex(UpdateSpoolIndex), SpoolHealth(Sender), RefreshIndexCache(RefreshIndexCache), - HandleDequeuedEnvelope(Box), UpdateProject(ProjectKey), } @@ -314,7 +313,6 @@ impl ProjectCache { Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", Self::SpoolHealth(_) => "SpoolHealth", Self::RefreshIndexCache(_) => "RefreshIndexCache", - Self::HandleDequeuedEnvelope(_) => "HandleDequeuedEnvelope", Self::UpdateProject(_) => "UpdateProject", } } @@ -421,15 +419,6 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: DequeuedEnvelope, _: ()) -> Self { - let DequeuedEnvelope(envelope) = message; - Self::HandleDequeuedEnvelope(envelope) - } -} - impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; @@ -1308,25 +1297,26 @@ impl ProjectCacheBroker { ProjectCache::RefreshIndexCache(message) => { self.handle_refresh_index_cache(message) } - ProjectCache::HandleDequeuedEnvelope(message) => { - let envelope_buffer = self - .services - .envelope_buffer - .clone() - .expect("Called HandleDequeuedEnvelope without an envelope buffer"); - - if let Err(e) = self.handle_dequeued_envelope(message, envelope_buffer) { - relay_log::error!( - error = &e as &dyn std::error::Error, - "Failed to handle popped envelope" - ); - } - } ProjectCache::UpdateProject(project) => self.handle_update_project(project), } } ) } + + fn handle_envelope(&mut self, dequeued_envelope: DequeuedEnvelope) { + let envelope_buffer = self + .services + .envelope_buffer + .clone() + .expect("Called HandleDequeuedEnvelope without an envelope buffer"); + + if let Err(e) = self.handle_dequeued_envelope(dequeued_envelope.0, envelope_buffer) { + relay_log::error!( + error = &e as &dyn std::error::Error, + "Failed to handle popped envelope" + ); + } + } } /// Service implementing the [`ProjectCache`] interface. @@ -1336,7 +1326,7 @@ pub struct ProjectCacheService { memory_checker: MemoryChecker, services: Services, global_config_rx: watch::Receiver, - project_cache_bounded_rx: mpsc::Receiver, + envelopes_rx: mpsc::Receiver, redis: Option, } @@ -1347,7 +1337,7 @@ impl ProjectCacheService { memory_checker: MemoryChecker, services: Services, global_config_rx: watch::Receiver, - project_cache_bounded_rx: mpsc::Receiver, + envelopes_rx: mpsc::Receiver, redis: Option, ) -> Self { Self { @@ -1355,7 +1345,7 @@ impl ProjectCacheService { memory_checker, services, global_config_rx, - project_cache_bounded_rx, + envelopes_rx, redis, } } @@ -1370,7 +1360,7 @@ impl Service for ProjectCacheService { memory_checker, services, mut global_config_rx, - mut project_cache_bounded_rx, + mut envelopes_rx, redis, } = self; let project_cache = services.project_cache.clone(); @@ -1491,10 +1481,9 @@ impl Service for ProjectCacheService { broker.handle_periodic_unspool() }) } - // TODO: this prioritization might stab us in the back. - Some(message) = project_cache_bounded_rx.recv() => { - metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_message_from_bounded", { - broker.handle_message(message) + Some(message) = envelopes_rx.recv() => { + metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_envelope", { + broker.handle_envelope(message) }) } Some(message) = rx.recv() => { From 1a21ad56dd1bf9bedcc0a16d044357aa97844b0f Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 20 Sep 2024 10:43:36 +0200 Subject: [PATCH 5/9] Add test --- relay-server/src/services/buffer/mod.rs | 32 +++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 4d4c12f9d0..6a3977dde0 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -705,4 +705,36 @@ mod tests { Some(ProjectCache::UpdateProject(key)) if key == project_key )) } + + #[tokio::test] + async fn output_is_throttled() { + tokio::time::pause(); + let (service, global_tx, mut envelopes_rx, _project_cache_rx, _) = buffer_service(); + global_tx.send_replace(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); + + let addr = service.start(); + + // Send 10 messages, with a bounded queue size of 5. + let envelope = new_envelope(false, "foo"); + let project_key = envelope.meta().public_key(); + for _ in 0..10 { + addr.send(EnvelopeBuffer::Push(envelope.clone())); + } + addr.send(EnvelopeBuffer::Ready(project_key)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut messages = vec![]; + envelopes_rx.recv_many(&mut messages, 100).await; + + assert_eq!( + messages + .iter() + .filter(|message| matches!(message, DequeuedEnvelope(..))) + .count(), + 5 + ); + } } From ec09ca141f8c3eea5e11d0fe0af3adb87efa8237 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 20 Sep 2024 10:45:17 +0200 Subject: [PATCH 6/9] Fix --- relay-server/src/services/project_cache.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 6c507e5f18..d2b02197f9 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -1326,6 +1326,7 @@ pub struct ProjectCacheService { memory_checker: MemoryChecker, services: Services, global_config_rx: watch::Receiver, + /// Bounded channel used exclusively to receive envelopes from the envelope buffer. envelopes_rx: mpsc::Receiver, redis: Option, } From 16ba743bbafb1a9592fddb24b1fbdf2f62a4e89a Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 20 Sep 2024 10:48:30 +0200 Subject: [PATCH 7/9] Fix --- relay-server/src/services/buffer/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 6a3977dde0..52cce9c922 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -236,6 +236,8 @@ impl EnvelopeBufferService { .expect("Element disappeared despite exclusive excess"); Self::drop_expired(envelope, services); + + sleep = Duration::ZERO; // try next pop immediately } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService: popping envelope"); From ed9cc6395a16465889df14c7dde5321ab8cb2be0 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 20 Sep 2024 10:51:32 +0200 Subject: [PATCH 8/9] Fix --- relay-server/src/services/buffer/mod.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 52cce9c922..667dea2b06 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -214,10 +214,9 @@ impl EnvelopeBufferService { services: Services, envelopes_tx_permit: Permit<'a, DequeuedEnvelope>, ) -> Result { - let mut sleep = Duration::ZERO; relay_log::trace!("EnvelopeBufferService: peeking the buffer"); - match buffer.peek().await? { + let sleep = match buffer.peek().await? { Peek::Empty => { relay_log::trace!("EnvelopeBufferService: peek returned empty"); relay_statsd::metric!( @@ -225,7 +224,7 @@ impl EnvelopeBufferService { peek_result = "empty" ); - sleep = Duration::MAX; // wait for reset by `handle_message`. + Duration::MAX // wait for reset by `handle_message`. } Peek::Ready(envelope) | Peek::NotReady(.., envelope) if Self::expired(config, envelope) => @@ -237,7 +236,7 @@ impl EnvelopeBufferService { Self::drop_expired(envelope, services); - sleep = Duration::ZERO; // try next pop immediately + Duration::ZERO // try next pop immediately } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService: popping envelope"); @@ -251,7 +250,7 @@ impl EnvelopeBufferService { .expect("Element disappeared despite exclusive excess"); envelopes_tx_permit.send(DequeuedEnvelope(envelope)); - sleep = Duration::ZERO; // try next pop immediately + Duration::ZERO // try next pop immediately } Peek::NotReady(stack_key, next_project_fetch, envelope) => { relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready"); @@ -281,9 +280,9 @@ impl EnvelopeBufferService { buffer.mark_seen(&stack_key, DEFAULT_SLEEP); } - sleep = DEFAULT_SLEEP; + DEFAULT_SLEEP // wait and prioritize handling new messages. } - } + }; Ok(sleep) } From 8df6bbdd68dd8443250efcd7ff421f91059f8872 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Fri, 20 Sep 2024 13:39:22 +0200 Subject: [PATCH 9/9] Fix --- relay-config/src/config.rs | 14 ++++++++++++++ relay-server/src/service.rs | 2 +- relay-server/src/services/buffer/mod.rs | 22 ++++++++++++++++++---- relay-server/src/services/project_cache.rs | 1 + 4 files changed, 34 insertions(+), 5 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 73b486199d..6bdaea3635 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -912,6 +912,11 @@ fn spool_disk_usage_refresh_frequency_ms() -> u64 { 100 } +/// Default bounded buffer size for handling backpressure. +fn spool_max_backpressure_envelopes() -> usize { + 500 +} + /// Persistent buffering configuration for incoming envelopes. #[derive(Debug, Serialize, Deserialize)] pub struct EnvelopeSpool { @@ -955,6 +960,9 @@ pub struct EnvelopeSpool { /// internal page stats. #[serde(default = "spool_disk_usage_refresh_frequency_ms")] disk_usage_refresh_frequency_ms: u64, + /// The amount of envelopes that can be put in the bounded buffer. + #[serde(default = "spool_max_backpressure_envelopes")] + max_backpressure_envelopes: usize, /// Version of the spooler. #[serde(default)] version: EnvelopeSpoolVersion, @@ -991,6 +999,7 @@ impl Default for EnvelopeSpool { max_batches: spool_envelopes_stack_max_batches(), max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(), disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(), + max_backpressure_envelopes: spool_max_backpressure_envelopes(), version: EnvelopeSpoolVersion::default(), } } @@ -2212,6 +2221,11 @@ impl Config { Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms) } + /// Returns the maximum number of envelopes that can be put in the bounded buffer. + pub fn spool_max_backpressure_envelopes(&self) -> usize { + self.values.spool.envelopes.max_backpressure_envelopes + } + /// Returns the maximum size of an event payload in bytes. pub fn max_event_size(&self) -> usize { self.values.limits.max_event_size.as_bytes() diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 5a1159726c..28364dde65 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -242,7 +242,7 @@ impl ServiceState { ) .spawn_handler(processor_rx); - let (envelopes_tx, envelopes_rx) = mpsc::channel(500); + let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes()); let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 667dea2b06..4b8f380439 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -211,7 +211,7 @@ impl EnvelopeBufferService { async fn try_pop<'a>( config: &Config, buffer: &mut PolymorphicEnvelopeBuffer, - services: Services, + services: &Services, envelopes_tx_permit: Permit<'a, DequeuedEnvelope>, ) -> Result { relay_log::trace!("EnvelopeBufferService: peeking the buffer"); @@ -291,7 +291,7 @@ impl EnvelopeBufferService { envelope.meta().start_time().elapsed() > config.spool_envelopes_max_age() } - fn drop_expired(envelope: Box, services: Services) { + fn drop_expired(envelope: Box, services: &Services) { let mut managed_envelope = ManagedEnvelope::new( envelope, services.outcome_aggregator.clone(), @@ -375,6 +375,7 @@ impl Service for EnvelopeBufferService { let memory_checker = self.memory_checker.clone(); let mut global_config_rx = self.global_config_rx.clone(); let services = self.services.clone(); + tokio::spawn(async move { let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; @@ -405,7 +406,7 @@ impl Service for EnvelopeBufferService { // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. Some(permit) = self.ready_to_pop(&buffer) => { - match Self::try_pop(&config, &mut buffer, services.clone(), permit).await { + match Self::try_pop(&config, &mut buffer, &services, permit).await { Ok(new_sleep) => { sleep = new_sleep; } @@ -430,7 +431,7 @@ impl Service for EnvelopeBufferService { } _ = global_config_rx.changed() => { relay_log::trace!("EnvelopeBufferService: received global config"); - sleep = Duration::ZERO; // Try to pop + sleep = Duration::ZERO; } else => break, } @@ -737,5 +738,18 @@ mod tests { .count(), 5 ); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut messages = vec![]; + envelopes_rx.recv_many(&mut messages, 100).await; + + assert_eq!( + messages + .iter() + .filter(|message| matches!(message, DequeuedEnvelope(..))) + .count(), + 5 + ); } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index d2b02197f9..690671a434 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -255,6 +255,7 @@ pub struct SpoolHealth; pub struct RefreshIndexCache(pub HashSet); /// Handle an envelope that was popped from the envelope buffer. +#[derive(Debug)] pub struct DequeuedEnvelope(pub Box); /// A request to update a project, typically sent by the envelope buffer.