From ea07659ae68e3686c5a4532bf7fb967bc1b11857 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Tue, 17 Sep 2024 11:36:25 +0200 Subject: [PATCH] fix(buffer): Always drop stale envelopes (#4034) When we transformed the buffer into a service, I left the time-based eviction in the project cache. This is a bug, because it means that old envelopes are only dropped if we managed to get their project state(s). With this PR, stale envelopes are always evicted once they are `peek`ed. --- relay-server/src/service.rs | 8 +- relay-server/src/services/buffer/mod.rs | 144 ++++++++++++++++++--- relay-server/src/services/project_cache.rs | 10 -- 3 files changed, 132 insertions(+), 30 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index f8fbdfe48c..289014f858 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::Duration; use crate::metrics::{MetricOutcomes, MetricStats}; -use crate::services::buffer::{EnvelopeBufferService, ObservableEnvelopeBuffer}; +use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer}; use crate::services::stats::RelayStats; use anyhow::{Context, Result}; use axum::extract::FromRequestParts; @@ -245,7 +245,11 @@ impl ServiceState { config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), global_config_rx.clone(), - project_cache.clone(), + buffer::Services { + project_cache: project_cache.clone(), + outcome_aggregator: outcome_aggregator.clone(), + test_store: test_store.clone(), + }, ) .map(|b| b.start_observable()); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 36aa2d87a0..a21d7bd8c7 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -17,10 +17,16 @@ use tokio::time::timeout; use crate::envelope::Envelope; use crate::services::buffer::envelope_buffer::Peek; use crate::services::global_config; +use crate::services::outcome::DiscardReason; +use crate::services::outcome::Outcome; +use crate::services::outcome::TrackOutcome; +use crate::services::processor::ProcessingGroup; use crate::services::project_cache::DequeuedEnvelope; use crate::services::project_cache::ProjectCache; use crate::services::project_cache::UpdateProject; +use crate::services::test_store::TestStore; use crate::statsd::RelayCounters; +use crate::utils::ManagedEnvelope; use crate::utils::MemoryChecker; pub use envelope_buffer::EnvelopeBufferError; @@ -88,13 +94,20 @@ impl ObservableEnvelopeBuffer { } } +/// Services that the buffer service communicates with. +pub struct Services { + pub project_cache: Addr, + pub outcome_aggregator: Addr, + pub test_store: Addr, +} + /// Spool V2 service which buffers envelopes and forwards them to the project cache when a project /// becomes ready. pub struct EnvelopeBufferService { config: Arc, memory_checker: MemoryChecker, global_config_rx: watch::Receiver, - project_cache: Addr, + services: Services, has_capacity: Arc, sleep: Duration, } @@ -114,14 +127,13 @@ impl EnvelopeBufferService { config: Arc, memory_checker: MemoryChecker, global_config_rx: watch::Receiver, - project_cache: Addr, + services: Services, ) -> Option { config.spool_v2().then(|| Self { config, memory_checker, - global_config_rx, - project_cache, + services, has_capacity: Arc::new(AtomicBool::new(true)), sleep: Duration::ZERO, }) @@ -205,6 +217,13 @@ impl EnvelopeBufferService { ); self.sleep = Duration::MAX; // wait for reset by `handle_message`. } + Peek::Ready(envelope) | Peek::NotReady(.., envelope) if self.expired(envelope) => { + let envelope = buffer + .pop() + .await? + .expect("Element disappeared despite exclusive excess"); + self.drop_expired(envelope); + } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService: popping envelope"); relay_statsd::metric!( @@ -215,8 +234,7 @@ impl EnvelopeBufferService { .pop() .await? .expect("Element disappeared despite exclusive excess"); - self.project_cache.send(DequeuedEnvelope(envelope)); - + self.services.project_cache.send(DequeuedEnvelope(envelope)); self.sleep = Duration::ZERO; // try next pop immediately } Peek::NotReady(stack_key, envelope) => { @@ -226,12 +244,14 @@ impl EnvelopeBufferService { peek_result = "not_ready" ); let project_key = envelope.meta().public_key(); - self.project_cache.send(UpdateProject(project_key)); + self.services.project_cache.send(UpdateProject(project_key)); match envelope.sampling_key() { None => {} Some(sampling_key) if sampling_key == project_key => {} // already sent. Some(sampling_key) => { - self.project_cache.send(UpdateProject(sampling_key)); + self.services + .project_cache + .send(UpdateProject(sampling_key)); } } // deprioritize the stack to prevent head-of-line blocking @@ -243,6 +263,20 @@ impl EnvelopeBufferService { Ok(()) } + fn expired(&self, envelope: &Envelope) -> bool { + envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age() + } + + fn drop_expired(&self, envelope: Box) { + let mut managed_envelope = ManagedEnvelope::new( + envelope, + self.services.outcome_aggregator.clone(), + self.services.test_store.clone(), + ProcessingGroup::Ungrouped, + ); + managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp)); + } + async fn handle_message( &mut self, buffer: &mut PolymorphicEnvelopeBuffer, @@ -388,9 +422,10 @@ impl Service for EnvelopeBufferService { #[cfg(test)] mod tests { - use std::time::Duration; + use std::time::{Duration, Instant}; use relay_dynamic_config::GlobalConfig; + use relay_quotas::DataCategory; use tokio::sync::mpsc; use uuid::Uuid; @@ -403,6 +438,7 @@ mod tests { EnvelopeBufferService, watch::Sender, mpsc::UnboundedReceiver, + mpsc::UnboundedReceiver, ) { let config = Arc::new( Config::from_json_value(serde_json::json!({ @@ -416,19 +452,30 @@ mod tests { ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); - let (project_cache_addr, project_cache_rx) = Addr::custom(); + let (project_cache, project_cache_rx) = Addr::custom(); + let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom(); ( - EnvelopeBufferService::new(config, memory_checker, global_rx, project_cache_addr) - .unwrap(), + EnvelopeBufferService::new( + config, + memory_checker, + global_rx, + Services { + project_cache, + outcome_aggregator, + test_store: Addr::dummy(), + }, + ) + .unwrap(), global_tx, project_cache_rx, + outcome_aggregator_rx, ) } #[tokio::test] async fn capacity_is_updated() { tokio::time::pause(); - let (service, _global_rx, _project_cache_tx) = buffer_service(); + let (service, _global_rx, _project_cache_tx, _) = buffer_service(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); @@ -450,7 +497,7 @@ mod tests { #[tokio::test] async fn pop_requires_global_config() { tokio::time::pause(); - let (service, global_tx, project_cache_rx) = buffer_service(); + let (service, global_tx, project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -498,10 +545,18 @@ mod tests { GlobalConfig::default(), ))); - let (project_cache_addr, project_cache_rx) = Addr::custom(); - let service = - EnvelopeBufferService::new(config, memory_checker, global_rx, project_cache_addr) - .unwrap(); + let (project_cache, project_cache_rx) = Addr::custom(); + let service = EnvelopeBufferService::new( + config, + memory_checker, + global_rx, + Services { + project_cache, + outcome_aggregator: Addr::dummy(), + test_store: Addr::dummy(), + }, + ) + .unwrap(); let addr = service.start(); // Send five messages: @@ -515,4 +570,57 @@ mod tests { // Nothing was dequeued, memory not ready: assert_eq!(project_cache_rx.len(), 0); } + + #[tokio::test] + async fn old_envelope_is_dropped() { + tokio::time::pause(); + + let config = Arc::new( + Config::from_json_value(serde_json::json!({ + "spool": { + "envelopes": { + "version": "experimental", + "max_envelope_delay_secs": 1, + } + } + })) + .unwrap(), + ); + let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); + let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); + let (project_cache, project_cache_rx) = Addr::custom(); + let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom(); + let service = EnvelopeBufferService::new( + config, + memory_checker, + global_rx, + Services { + project_cache, + outcome_aggregator, + test_store: Addr::dummy(), + }, + ) + .unwrap(); + + global_tx.send_replace(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); + + let config = service.config.clone(); + let addr = service.start(); + + // Send five messages: + let mut envelope = new_envelope(false, "foo"); + envelope + .meta_mut() + .set_start_time(Instant::now() - 2 * config.spool_envelopes_max_age()); + addr.send(EnvelopeBuffer::Push(envelope)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + assert!(project_cache_rx.is_empty()); + let outcome = outcome_aggregator_rx.try_recv().unwrap(); + assert_eq!(outcome.category, DataCategory::TransactionIndexed); + assert_eq!(outcome.quantity, 1); + } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index ef2f3f813f..3c6633207e 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -1092,16 +1092,6 @@ impl ProjectCacheBroker { envelope: Box, envelope_buffer: Addr, ) -> Result<(), EnvelopeBufferError> { - if envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age() { - let mut managed_envelope = ManagedEnvelope::new( - envelope, - self.services.outcome_aggregator.clone(), - self.services.test_store.clone(), - ProcessingGroup::Ungrouped, - ); - managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp)); - return Ok(()); - } let sampling_key = envelope.sampling_key(); let services = self.services.clone();