diff --git a/relay_monitors/fn.process_check_in.html b/relay_monitors/fn.process_check_in.html index 305e024a78..3204c4bc81 100644 --- a/relay_monitors/fn.process_check_in.html +++ b/relay_monitors/fn.process_check_in.html @@ -1,5 +1,5 @@ process_check_in in relay_monitors - Rust

Function relay_monitors::process_check_in

source ·
pub fn process_check_in(
     payload: &[u8],
-    project_id: ProjectId,
+    project_id: ProjectId,
 ) -> Result<ProcessedCheckInResult, ProcessCheckInError>
Expand description

Normalizes a monitor check-in payload.

\ No newline at end of file diff --git a/relay_spans/fn.otel_to_sentry_span.html b/relay_spans/fn.otel_to_sentry_span.html index a32a048e07..ae016236b0 100644 --- a/relay_spans/fn.otel_to_sentry_span.html +++ b/relay_spans/fn.otel_to_sentry_span.html @@ -1,2 +1,2 @@ -otel_to_sentry_span in relay_spans - Rust

Function relay_spans::otel_to_sentry_span

source ·
pub fn otel_to_sentry_span(otel_span: Span) -> Span
Expand description

Transform an OtelSpan to a Sentry span.

+otel_to_sentry_span in relay_spans - Rust

Function relay_spans::otel_to_sentry_span

source ·
pub fn otel_to_sentry_span(otel_span: Span) -> Span
Expand description

Transform an OtelSpan to a Sentry span.

\ No newline at end of file diff --git a/src/relay_server/service.rs.html b/src/relay_server/service.rs.html index d49dfeb4f9..707bb269c7 100644 --- a/src/relay_server/service.rs.html +++ b/src/relay_server/service.rs.html @@ -435,13 +435,17 @@ 435 436 437 +438 +439 +440 +441
use std::convert::Infallible;
 use std::fmt;
 use std::sync::Arc;
 use std::time::Duration;
 
 use crate::metrics::{MetricOutcomes, MetricStats};
-use crate::services::buffer::{EnvelopeBufferService, ObservableEnvelopeBuffer};
+use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer};
 use crate::services::stats::RelayStats;
 use anyhow::{Context, Result};
 use axum::extract::FromRequestParts;
@@ -682,7 +686,11 @@
             config.clone(),
             MemoryChecker::new(memory_stat.clone(), config.clone()),
             global_config_rx.clone(),
-            project_cache.clone(),
+            buffer::Services {
+                project_cache: project_cache.clone(),
+                outcome_aggregator: outcome_aggregator.clone(),
+                test_store: test_store.clone(),
+            },
         )
         .map(|b| b.start_observable());
 
diff --git a/src/relay_server/services/buffer/mod.rs.html b/src/relay_server/services/buffer/mod.rs.html
index a2b38ad2e7..de32f476bb 100644
--- a/src/relay_server/services/buffer/mod.rs.html
+++ b/src/relay_server/services/buffer/mod.rs.html
@@ -516,6 +516,114 @@
 516
 517
 518
+519
+520
+521
+522
+523
+524
+525
+526
+527
+528
+529
+530
+531
+532
+533
+534
+535
+536
+537
+538
+539
+540
+541
+542
+543
+544
+545
+546
+547
+548
+549
+550
+551
+552
+553
+554
+555
+556
+557
+558
+559
+560
+561
+562
+563
+564
+565
+566
+567
+568
+569
+570
+571
+572
+573
+574
+575
+576
+577
+578
+579
+580
+581
+582
+583
+584
+585
+586
+587
+588
+589
+590
+591
+592
+593
+594
+595
+596
+597
+598
+599
+600
+601
+602
+603
+604
+605
+606
+607
+608
+609
+610
+611
+612
+613
+614
+615
+616
+617
+618
+619
+620
+621
+622
+623
+624
+625
+626
 
//! Types for buffering envelopes.
 
 use std::error::Error;
@@ -535,10 +643,16 @@
 use crate::envelope::Envelope;
 use crate::services::buffer::envelope_buffer::Peek;
 use crate::services::global_config;
+use crate::services::outcome::DiscardReason;
+use crate::services::outcome::Outcome;
+use crate::services::outcome::TrackOutcome;
+use crate::services::processor::ProcessingGroup;
 use crate::services::project_cache::DequeuedEnvelope;
 use crate::services::project_cache::ProjectCache;
 use crate::services::project_cache::UpdateProject;
+use crate::services::test_store::TestStore;
 use crate::statsd::RelayCounters;
+use crate::utils::ManagedEnvelope;
 use crate::utils::MemoryChecker;
 
 pub use envelope_buffer::EnvelopeBufferError;
@@ -606,13 +720,20 @@
     }
 }
 
+/// Services that the buffer service communicates with.
+pub struct Services {
+    pub project_cache: Addr<ProjectCache>,
+    pub outcome_aggregator: Addr<TrackOutcome>,
+    pub test_store: Addr<TestStore>,
+}
+
 /// Spool V2 service which buffers envelopes and forwards them to the project cache when a project
 /// becomes ready.
 pub struct EnvelopeBufferService {
     config: Arc<Config>,
     memory_checker: MemoryChecker,
     global_config_rx: watch::Receiver<global_config::Status>,
-    project_cache: Addr<ProjectCache>,
+    services: Services,
     has_capacity: Arc<AtomicBool>,
     sleep: Duration,
 }
@@ -632,14 +753,13 @@
         config: Arc<Config>,
         memory_checker: MemoryChecker,
         global_config_rx: watch::Receiver<global_config::Status>,
-        project_cache: Addr<ProjectCache>,
+        services: Services,
     ) -> Option<Self> {
         config.spool_v2().then(|| Self {
             config,
             memory_checker,
-
             global_config_rx,
-            project_cache,
+            services,
             has_capacity: Arc::new(AtomicBool::new(true)),
             sleep: Duration::ZERO,
         })
@@ -723,6 +843,13 @@
                 );
                 self.sleep = Duration::MAX; // wait for reset by `handle_message`.
             }
+            Peek::Ready(envelope) | Peek::NotReady(.., envelope) if self.expired(envelope) => {
+                let envelope = buffer
+                    .pop()
+                    .await?
+                    .expect("Element disappeared despite exclusive excess");
+                self.drop_expired(envelope);
+            }
             Peek::Ready(_) => {
                 relay_log::trace!("EnvelopeBufferService: popping envelope");
                 relay_statsd::metric!(
@@ -733,8 +860,7 @@
                     .pop()
                     .await?
                     .expect("Element disappeared despite exclusive excess");
-                self.project_cache.send(DequeuedEnvelope(envelope));
-
+                self.services.project_cache.send(DequeuedEnvelope(envelope));
                 self.sleep = Duration::ZERO; // try next pop immediately
             }
             Peek::NotReady(stack_key, envelope) => {
@@ -744,12 +870,14 @@
                     peek_result = "not_ready"
                 );
                 let project_key = envelope.meta().public_key();
-                self.project_cache.send(UpdateProject(project_key));
+                self.services.project_cache.send(UpdateProject(project_key));
                 match envelope.sampling_key() {
                     None => {}
                     Some(sampling_key) if sampling_key == project_key => {} // already sent.
                     Some(sampling_key) => {
-                        self.project_cache.send(UpdateProject(sampling_key));
+                        self.services
+                            .project_cache
+                            .send(UpdateProject(sampling_key));
                     }
                 }
                 // deprioritize the stack to prevent head-of-line blocking
@@ -761,6 +889,20 @@
         Ok(())
     }
 
+    fn expired(&self, envelope: &Envelope) -> bool {
+        envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age()
+    }
+
+    fn drop_expired(&self, envelope: Box<Envelope>) {
+        let mut managed_envelope = ManagedEnvelope::new(
+            envelope,
+            self.services.outcome_aggregator.clone(),
+            self.services.test_store.clone(),
+            ProcessingGroup::Ungrouped,
+        );
+        managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp));
+    }
+
     async fn handle_message(
         &mut self,
         buffer: &mut PolymorphicEnvelopeBuffer,
@@ -906,9 +1048,10 @@
 
 #[cfg(test)]
 mod tests {
-    use std::time::Duration;
+    use std::time::{Duration, Instant};
 
     use relay_dynamic_config::GlobalConfig;
+    use relay_quotas::DataCategory;
     use tokio::sync::mpsc;
     use uuid::Uuid;
 
@@ -921,6 +1064,7 @@
         EnvelopeBufferService,
         watch::Sender<global_config::Status>,
         mpsc::UnboundedReceiver<ProjectCache>,
+        mpsc::UnboundedReceiver<TrackOutcome>,
     ) {
         let config = Arc::new(
             Config::from_json_value(serde_json::json!({
@@ -934,19 +1078,30 @@
         );
         let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
         let (global_tx, global_rx) = watch::channel(global_config::Status::Pending);
-        let (project_cache_addr, project_cache_rx) = Addr::custom();
+        let (project_cache, project_cache_rx) = Addr::custom();
+        let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom();
         (
-            EnvelopeBufferService::new(config, memory_checker, global_rx, project_cache_addr)
-                .unwrap(),
+            EnvelopeBufferService::new(
+                config,
+                memory_checker,
+                global_rx,
+                Services {
+                    project_cache,
+                    outcome_aggregator,
+                    test_store: Addr::dummy(),
+                },
+            )
+            .unwrap(),
             global_tx,
             project_cache_rx,
+            outcome_aggregator_rx,
         )
     }
 
     #[tokio::test]
     async fn capacity_is_updated() {
         tokio::time::pause();
-        let (service, _global_rx, _project_cache_tx) = buffer_service();
+        let (service, _global_rx, _project_cache_tx, _) = buffer_service();
 
         // Set capacity to false:
         service.has_capacity.store(false, Ordering::Relaxed);
@@ -968,7 +1123,7 @@
     #[tokio::test]
     async fn pop_requires_global_config() {
         tokio::time::pause();
-        let (service, global_tx, project_cache_rx) = buffer_service();
+        let (service, global_tx, project_cache_rx, _) = buffer_service();
 
         let addr = service.start();
 
@@ -1016,10 +1171,18 @@
             GlobalConfig::default(),
         )));
 
-        let (project_cache_addr, project_cache_rx) = Addr::custom();
-        let service =
-            EnvelopeBufferService::new(config, memory_checker, global_rx, project_cache_addr)
-                .unwrap();
+        let (project_cache, project_cache_rx) = Addr::custom();
+        let service = EnvelopeBufferService::new(
+            config,
+            memory_checker,
+            global_rx,
+            Services {
+                project_cache,
+                outcome_aggregator: Addr::dummy(),
+                test_store: Addr::dummy(),
+            },
+        )
+        .unwrap();
         let addr = service.start();
 
         // Send five messages:
@@ -1033,5 +1196,58 @@
         // Nothing was dequeued, memory not ready:
         assert_eq!(project_cache_rx.len(), 0);
     }
+
+    #[tokio::test]
+    async fn old_envelope_is_dropped() {
+        tokio::time::pause();
+
+        let config = Arc::new(
+            Config::from_json_value(serde_json::json!({
+                "spool": {
+                    "envelopes": {
+                        "version": "experimental",
+                        "max_envelope_delay_secs": 1,
+                    }
+                }
+            }))
+            .unwrap(),
+        );
+        let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
+        let (global_tx, global_rx) = watch::channel(global_config::Status::Pending);
+        let (project_cache, project_cache_rx) = Addr::custom();
+        let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom();
+        let service = EnvelopeBufferService::new(
+            config,
+            memory_checker,
+            global_rx,
+            Services {
+                project_cache,
+                outcome_aggregator,
+                test_store: Addr::dummy(),
+            },
+        )
+        .unwrap();
+
+        global_tx.send_replace(global_config::Status::Ready(Arc::new(
+            GlobalConfig::default(),
+        )));
+
+        let config = service.config.clone();
+        let addr = service.start();
+
+        // Send five messages:
+        let mut envelope = new_envelope(false, "foo");
+        envelope
+            .meta_mut()
+            .set_start_time(Instant::now() - 2 * config.spool_envelopes_max_age());
+        addr.send(EnvelopeBuffer::Push(envelope));
+
+        tokio::time::sleep(Duration::from_millis(100)).await;
+
+        assert!(project_cache_rx.is_empty());
+        let outcome = outcome_aggregator_rx.try_recv().unwrap();
+        assert_eq!(outcome.category, DataCategory::TransactionIndexed);
+        assert_eq!(outcome.quantity, 1);
+    }
 }
 
\ No newline at end of file diff --git a/src/relay_server/services/project_cache.rs.html b/src/relay_server/services/project_cache.rs.html index 700c7735e6..13eedf758d 100644 --- a/src/relay_server/services/project_cache.rs.html +++ b/src/relay_server/services/project_cache.rs.html @@ -1768,16 +1768,6 @@ 1768 1769 1770 -1771 -1772 -1773 -1774 -1775 -1776 -1777 -1778 -1779 -1780
use std::collections::{BTreeMap, BTreeSet};
 use std::error::Error;
 use std::sync::Arc;
@@ -2872,16 +2862,6 @@
         envelope: Box<Envelope>,
         envelope_buffer: Addr<EnvelopeBuffer>,
     ) -> Result<(), EnvelopeBufferError> {
-        if envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age() {
-            let mut managed_envelope = ManagedEnvelope::new(
-                envelope,
-                self.services.outcome_aggregator.clone(),
-                self.services.test_store.clone(),
-                ProcessingGroup::Ungrouped,
-            );
-            managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp));
-            return Ok(());
-        }
         let sampling_key = envelope.sampling_key();
         let services = self.services.clone();