diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 43ea78e6ec..5a1159726c 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -242,13 +242,14 @@ impl ServiceState { ) .spawn_handler(processor_rx); - let (project_cache_bounded_tx, project_cache_bounded_rx) = mpsc::channel(500); + let (envelopes_tx, envelopes_rx) = mpsc::channel(500); let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), global_config_rx.clone(), buffer::Services { - project_cache: project_cache_bounded_tx, + envelopes_tx, + project_cache: project_cache.clone(), outcome_aggregator: outcome_aggregator.clone(), test_store: test_store.clone(), }, @@ -271,7 +272,7 @@ impl ServiceState { MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache_services, global_config_rx, - project_cache_bounded_rx, + envelopes_rx, redis_pools .as_ref() .map(|pools| pools.project_configs.clone()), diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 96ea4fc952..4d4c12f9d0 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -8,8 +8,9 @@ use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; -use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, SendError, Service}; +use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; use relay_system::{Controller, Shutdown}; +use tokio::sync::mpsc::Permit; use tokio::sync::{mpsc, watch}; use tokio::time::{timeout, Instant}; @@ -20,7 +21,7 @@ use crate::services::outcome::DiscardReason; use crate::services::outcome::Outcome; use crate::services::outcome::TrackOutcome; use crate::services::processor::ProcessingGroup; -use crate::services::project_cache::ProjectCache; +use crate::services::project_cache::{DequeuedEnvelope, ProjectCache, UpdateProject}; use crate::services::test_store::TestStore; use crate::statsd::RelayCounters; @@ -93,8 +94,12 @@ impl ObservableEnvelopeBuffer { } /// Services that the buffer service communicates with. +#[derive(Clone)] pub struct Services { - pub project_cache: mpsc::Sender, + /// Bounded channel used exclusively to handle backpressure when sending envelopes to the + /// project cache. + pub envelopes_tx: mpsc::Sender, + pub project_cache: Addr, pub outcome_aggregator: Addr, pub test_store: Addr, } @@ -147,7 +152,10 @@ impl EnvelopeBufferService { } /// Wait for the configured amount of time and make sure the project cache is ready to receive. - async fn ready_to_pop(&mut self, buffer: &PolymorphicEnvelopeBuffer) -> Result<(), SendError> { + async fn ready_to_pop( + &mut self, + buffer: &PolymorphicEnvelopeBuffer, + ) -> Option> { relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checking" @@ -169,16 +177,14 @@ impl EnvelopeBufferService { status = "slept" ); - while self.services.project_cache.capacity() == 0 { - tokio::time::sleep(Duration::from_millis(1)).await; - } + let permit = self.services.envelopes_tx.reserve().await.ok(); relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checked" ); - Ok(()) + permit } /// Waits until preconditions for unspooling are met. @@ -202,11 +208,15 @@ impl EnvelopeBufferService { } /// Tries to pop an envelope for a ready project. - async fn try_pop( - &mut self, + async fn try_pop<'a>( + config: &Config, buffer: &mut PolymorphicEnvelopeBuffer, - ) -> Result<(), EnvelopeBufferError> { + services: Services, + envelopes_tx_permit: Permit<'a, DequeuedEnvelope>, + ) -> Result { + let mut sleep = Duration::ZERO; relay_log::trace!("EnvelopeBufferService: peeking the buffer"); + match buffer.peek().await? { Peek::Empty => { relay_log::trace!("EnvelopeBufferService: peek returned empty"); @@ -214,14 +224,18 @@ impl EnvelopeBufferService { counter(RelayCounters::BufferTryPop) += 1, peek_result = "empty" ); - self.sleep = Duration::MAX; // wait for reset by `handle_message`. + + sleep = Duration::MAX; // wait for reset by `handle_message`. } - Peek::Ready(envelope) | Peek::NotReady(.., envelope) if self.expired(envelope) => { + Peek::Ready(envelope) | Peek::NotReady(.., envelope) + if Self::expired(config, envelope) => + { let envelope = buffer .pop() .await? .expect("Element disappeared despite exclusive excess"); - self.drop_expired(envelope); + + Self::drop_expired(envelope, services); } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService: popping envelope"); @@ -233,18 +247,9 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); - if let Err(error) = self - .services - .project_cache - .send(ProjectCache::HandleDequeuedEnvelope(envelope)) - .await - { - relay_log::error!( - error = &error as &dyn Error, - "the envelope buffer couldn't send an envelope to the project cache", - ); - }; - self.sleep = Duration::ZERO; // try next pop immediately + envelopes_tx_permit.send(DequeuedEnvelope(envelope)); + + sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, next_project_fetch, envelope) => { relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready"); @@ -260,21 +265,12 @@ impl EnvelopeBufferService { relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); let own_key = envelope.meta().public_key(); - // TODO: do we want to handle an error? - let _ = self - .services - .project_cache - .send(ProjectCache::UpdateProject(own_key)) - .await; + services.project_cache.send(UpdateProject(own_key)); match envelope.sampling_key() { None => {} Some(sampling_key) if sampling_key == own_key => {} // already sent. Some(sampling_key) => { - let _ = self - .services - .project_cache - .send(ProjectCache::UpdateProject(sampling_key)) - .await; + services.project_cache.send(UpdateProject(sampling_key)); } } @@ -283,32 +279,28 @@ impl EnvelopeBufferService { buffer.mark_seen(&stack_key, DEFAULT_SLEEP); } - self.sleep = DEFAULT_SLEEP; + sleep = DEFAULT_SLEEP; } } - Ok(()) + Ok(sleep) } - fn expired(&self, envelope: &Envelope) -> bool { - envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age() + fn expired(config: &Config, envelope: &Envelope) -> bool { + envelope.meta().start_time().elapsed() > config.spool_envelopes_max_age() } - fn drop_expired(&self, envelope: Box) { + fn drop_expired(envelope: Box, services: Services) { let mut managed_envelope = ManagedEnvelope::new( envelope, - self.services.outcome_aggregator.clone(), - self.services.test_store.clone(), + services.outcome_aggregator.clone(), + services.test_store.clone(), ProcessingGroup::Ungrouped, ); managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp)); } - async fn handle_message( - &mut self, - buffer: &mut PolymorphicEnvelopeBuffer, - message: EnvelopeBuffer, - ) { + async fn handle_message(buffer: &mut PolymorphicEnvelopeBuffer, message: EnvelopeBuffer) { match message { EnvelopeBuffer::Push(envelope) => { // NOTE: This function assumes that a project state update for the relevant @@ -316,7 +308,7 @@ impl EnvelopeBufferService { // For better separation of concerns, this prefetch should be triggered from here // once buffer V1 has been removed. relay_log::trace!("EnvelopeBufferService: received push message"); - self.push(buffer, envelope).await; + Self::push(buffer, envelope).await; } EnvelopeBuffer::NotReady(project_key, envelope) => { relay_log::trace!( @@ -324,7 +316,7 @@ impl EnvelopeBufferService { &project_key ); relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1); - self.push(buffer, envelope).await; + Self::push(buffer, envelope).await; buffer.mark_ready(&project_key, false); } EnvelopeBuffer::Ready(project_key) => { @@ -335,14 +327,9 @@ impl EnvelopeBufferService { buffer.mark_ready(&project_key, true); } }; - self.sleep = Duration::ZERO; } - async fn handle_shutdown( - &mut self, - buffer: &mut PolymorphicEnvelopeBuffer, - message: Shutdown, - ) -> bool { + async fn handle_shutdown(buffer: &mut PolymorphicEnvelopeBuffer, message: Shutdown) -> bool { // We gracefully shut down only if the shutdown has a timeout. if let Some(shutdown_timeout) = message.timeout { relay_log::trace!("EnvelopeBufferService: shutting down gracefully"); @@ -364,7 +351,7 @@ impl EnvelopeBufferService { false } - async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { + async fn push(buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { if let Err(e) = buffer.push(envelope).await { relay_log::error!( error = &e as &dyn std::error::Error, @@ -386,6 +373,7 @@ impl Service for EnvelopeBufferService { let config = self.config.clone(); let memory_checker = self.memory_checker.clone(); let mut global_config_rx = self.global_config_rx.clone(); + let services = self.services.clone(); tokio::spawn(async move { let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; @@ -409,36 +397,44 @@ impl Service for EnvelopeBufferService { iteration += 1; relay_log::trace!("EnvelopeBufferService: loop iteration {iteration}"); + let mut sleep = Duration::MAX; tokio::select! { // NOTE: we do not select a bias here. // On the one hand, we might want to prioritize dequeuing over enqueuing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - Ok(_) = self.ready_to_pop(&buffer) => { - if let Err(e) = self.try_pop(&mut buffer).await { - relay_log::error!( - error = &e as &dyn std::error::Error, + Some(permit) = self.ready_to_pop(&buffer) => { + match Self::try_pop(&config, &mut buffer, services.clone(), permit).await { + Ok(new_sleep) => { + sleep = new_sleep; + } + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, "failed to pop envelope" ); + } } } Some(message) = rx.recv() => { - self.handle_message(&mut buffer, message).await; + Self::handle_message(&mut buffer, message).await; + sleep = Duration::ZERO; } shutdown = shutdown.notified() => { // In case the shutdown was handled, we break out of the loop signaling that // there is no need to process anymore envelopes. - if self.handle_shutdown(&mut buffer, shutdown).await { + if Self::handle_shutdown(&mut buffer, shutdown).await { break; } } _ = global_config_rx.changed() => { relay_log::trace!("EnvelopeBufferService: received global config"); - self.sleep = Duration::ZERO; // Try to pop + sleep = Duration::ZERO; // Try to pop } else => break, } + self.sleep = sleep; self.update_observable_state(&mut buffer); } @@ -464,7 +460,8 @@ mod tests { fn buffer_service() -> ( EnvelopeBufferService, watch::Sender, - mpsc::Receiver, + mpsc::Receiver, + mpsc::UnboundedReceiver, mpsc::UnboundedReceiver, ) { let config = Arc::new( @@ -479,7 +476,8 @@ mod tests { ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); - let (project_cache, project_cache_rx) = mpsc::channel(5); + let (envelopes_tx, envelopes_rx) = mpsc::channel(5); + let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom(); ( EnvelopeBufferService::new( @@ -487,6 +485,7 @@ mod tests { memory_checker, global_rx, Services { + envelopes_tx, project_cache, outcome_aggregator, test_store: Addr::dummy(), @@ -494,6 +493,7 @@ mod tests { ) .unwrap(), global_tx, + envelopes_rx, project_cache_rx, outcome_aggregator_rx, ) @@ -502,7 +502,7 @@ mod tests { #[tokio::test] async fn capacity_is_updated() { tokio::time::pause(); - let (service, _global_rx, _project_cache_tx, _) = buffer_service(); + let (service, _global_tx, _envelopes_rx, _project_cache_tx, _) = buffer_service(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); @@ -525,7 +525,7 @@ mod tests { async fn pop_requires_global_config() { relay_log::init_test!(); tokio::time::pause(); - let (service, global_tx, project_cache_rx, _) = buffer_service(); + let (service, global_tx, envelopes_rx, project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -538,6 +538,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; // Nothing was dequeued, global config not ready: + assert_eq!(envelopes_rx.len(), 0); assert_eq!(project_cache_rx.len(), 0); global_tx.send_replace(global_config::Status::Ready(Arc::new( @@ -547,7 +548,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; // Dequeued, global config ready: - assert_eq!(project_cache_rx.len(), 1); + assert_eq!(envelopes_rx.len(), 1); + assert_eq!(project_cache_rx.len(), 0); } #[tokio::test] @@ -573,12 +575,14 @@ mod tests { GlobalConfig::default(), ))); - let (project_cache, project_cache_rx) = mpsc::channel(20); + let (envelopes_tx, envelopes_rx) = mpsc::channel(5); + let (project_cache, project_cache_rx) = Addr::custom(); let service = EnvelopeBufferService::new( config, memory_checker, global_rx, Services { + envelopes_tx, project_cache, outcome_aggregator: Addr::dummy(), test_store: Addr::dummy(), @@ -596,6 +600,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000)).await; // Nothing was dequeued, memory not ready: + assert_eq!(envelopes_rx.len(), 0); assert_eq!(project_cache_rx.len(), 0); } @@ -616,13 +621,15 @@ mod tests { ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); - let (project_cache, project_cache_rx) = mpsc::channel(20); + let (envelopes_tx, envelopes_rx) = mpsc::channel(5); + let (project_cache, project_cache_rx) = Addr::custom(); let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); let service = EnvelopeBufferService::new( config, memory_checker, global_rx, Services { + envelopes_tx, project_cache, outcome_aggregator, test_store: Addr::dummy(), @@ -646,7 +653,9 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; - assert!(project_cache_rx.is_empty()); + assert_eq!(envelopes_rx.len(), 0); + assert_eq!(project_cache_rx.len(), 0); + let outcome = outcome_aggregator_rx.try_recv().unwrap(); assert_eq!(outcome.category, DataCategory::TransactionIndexed); assert_eq!(outcome.quantity, 1); @@ -655,7 +664,7 @@ mod tests { #[tokio::test] async fn test_update_project() { tokio::time::pause(); - let (service, global_tx, mut project_cache_rx, _) = buffer_service(); + let (service, global_tx, mut envelopes_rx, mut project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -670,9 +679,8 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; - // We expect the project update request to be sent. - let Some(ProjectCache::HandleDequeuedEnvelope(envelope)) = project_cache_rx.recv().await - else { + // We expect the envelope to be forwarded because by default we mark the project as ready. + let Some(DequeuedEnvelope(envelope)) = envelopes_rx.recv().await else { panic!(); }; @@ -680,6 +688,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; + // We expect the project update request to be sent. assert_eq!(project_cache_rx.len(), 1); let message = project_cache_rx.recv().await; assert!(matches!( @@ -689,6 +698,7 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; + // We expect the project update request to be sent. assert_eq!(project_cache_rx.len(), 1); assert!(matches!( message, diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 4a04342578..6c507e5f18 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -295,7 +295,6 @@ pub enum ProjectCache { UpdateSpoolIndex(UpdateSpoolIndex), SpoolHealth(Sender), RefreshIndexCache(RefreshIndexCache), - HandleDequeuedEnvelope(Box), UpdateProject(ProjectKey), } @@ -314,7 +313,6 @@ impl ProjectCache { Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex", Self::SpoolHealth(_) => "SpoolHealth", Self::RefreshIndexCache(_) => "RefreshIndexCache", - Self::HandleDequeuedEnvelope(_) => "HandleDequeuedEnvelope", Self::UpdateProject(_) => "UpdateProject", } } @@ -421,15 +419,6 @@ impl FromMessage for ProjectCache { } } -impl FromMessage for ProjectCache { - type Response = relay_system::NoResponse; - - fn from_message(message: DequeuedEnvelope, _: ()) -> Self { - let DequeuedEnvelope(envelope) = message; - Self::HandleDequeuedEnvelope(envelope) - } -} - impl FromMessage for ProjectCache { type Response = relay_system::NoResponse; @@ -1308,25 +1297,26 @@ impl ProjectCacheBroker { ProjectCache::RefreshIndexCache(message) => { self.handle_refresh_index_cache(message) } - ProjectCache::HandleDequeuedEnvelope(message) => { - let envelope_buffer = self - .services - .envelope_buffer - .clone() - .expect("Called HandleDequeuedEnvelope without an envelope buffer"); - - if let Err(e) = self.handle_dequeued_envelope(message, envelope_buffer) { - relay_log::error!( - error = &e as &dyn std::error::Error, - "Failed to handle popped envelope" - ); - } - } ProjectCache::UpdateProject(project) => self.handle_update_project(project), } } ) } + + fn handle_envelope(&mut self, dequeued_envelope: DequeuedEnvelope) { + let envelope_buffer = self + .services + .envelope_buffer + .clone() + .expect("Called HandleDequeuedEnvelope without an envelope buffer"); + + if let Err(e) = self.handle_dequeued_envelope(dequeued_envelope.0, envelope_buffer) { + relay_log::error!( + error = &e as &dyn std::error::Error, + "Failed to handle popped envelope" + ); + } + } } /// Service implementing the [`ProjectCache`] interface. @@ -1336,7 +1326,7 @@ pub struct ProjectCacheService { memory_checker: MemoryChecker, services: Services, global_config_rx: watch::Receiver, - project_cache_bounded_rx: mpsc::Receiver, + envelopes_rx: mpsc::Receiver, redis: Option, } @@ -1347,7 +1337,7 @@ impl ProjectCacheService { memory_checker: MemoryChecker, services: Services, global_config_rx: watch::Receiver, - project_cache_bounded_rx: mpsc::Receiver, + envelopes_rx: mpsc::Receiver, redis: Option, ) -> Self { Self { @@ -1355,7 +1345,7 @@ impl ProjectCacheService { memory_checker, services, global_config_rx, - project_cache_bounded_rx, + envelopes_rx, redis, } } @@ -1370,7 +1360,7 @@ impl Service for ProjectCacheService { memory_checker, services, mut global_config_rx, - mut project_cache_bounded_rx, + mut envelopes_rx, redis, } = self; let project_cache = services.project_cache.clone(); @@ -1491,10 +1481,9 @@ impl Service for ProjectCacheService { broker.handle_periodic_unspool() }) } - // TODO: this prioritization might stab us in the back. - Some(message) = project_cache_bounded_rx.recv() => { - metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_message_from_bounded", { - broker.handle_message(message) + Some(message) = envelopes_rx.recv() => { + metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "handle_envelope", { + broker.handle_envelope(message) }) } Some(message) = rx.recv() => {