From 79b2c789cc4ca465f19ddb86adc4c0978587c000 Mon Sep 17 00:00:00 2001 From: iambriccardo Date: Wed, 18 Sep 2024 12:35:33 +0000 Subject: [PATCH] deploy: b0c0e434e2459ab92d2ae62c7965c2a7d84e8aa8 --- relay_monitors/fn.process_check_in.html | 2 +- src/relay_server/service.rs.html | 12 -- src/relay_server/services/buffer/mod.rs.html | 110 +----------------- .../services/project_cache.rs.html | 36 ++---- 4 files changed, 11 insertions(+), 149 deletions(-) diff --git a/relay_monitors/fn.process_check_in.html b/relay_monitors/fn.process_check_in.html index 3204c4bc81..305e024a78 100644 --- a/relay_monitors/fn.process_check_in.html +++ b/relay_monitors/fn.process_check_in.html @@ -1,5 +1,5 @@ process_check_in in relay_monitors - Rust

Function relay_monitors::process_check_in

source ยท
pub fn process_check_in(
     payload: &[u8],
-    project_id: ProjectId,
+    project_id: ProjectId,
 ) -> Result<ProcessedCheckInResult, ProcessCheckInError>
Expand description

Normalizes a monitor check-in payload.

\ No newline at end of file diff --git a/src/relay_server/service.rs.html b/src/relay_server/service.rs.html index e8ae7cb12a..707bb269c7 100644 --- a/src/relay_server/service.rs.html +++ b/src/relay_server/service.rs.html @@ -439,15 +439,8 @@ 439 440 441 -442 -443 -444 -445 -446 -447
use std::convert::Infallible;
 use std::fmt;
-use std::sync::atomic::AtomicBool;
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -689,9 +682,6 @@
         )
         .spawn_handler(processor_rx);
 
-        // We initialize a shared boolean that is used to manage backpressure between the
-        // EnvelopeBufferService and the ProjectCacheService.
-        let project_cache_ready = Arc::new(AtomicBool::new(true));
         let envelope_buffer = EnvelopeBufferService::new(
             config.clone(),
             MemoryChecker::new(memory_stat.clone(), config.clone()),
@@ -701,7 +691,6 @@
                 outcome_aggregator: outcome_aggregator.clone(),
                 test_store: test_store.clone(),
             },
-            project_cache_ready.clone(),
         )
         .map(|b| b.start_observable());
 
@@ -724,7 +713,6 @@
             redis_pools
                 .as_ref()
                 .map(|pools| pools.project_configs.clone()),
-            project_cache_ready,
         )
         .spawn_handler(project_cache_rx);
 
diff --git a/src/relay_server/services/buffer/mod.rs.html b/src/relay_server/services/buffer/mod.rs.html
index e8d0f18b96..de32f476bb 100644
--- a/src/relay_server/services/buffer/mod.rs.html
+++ b/src/relay_server/services/buffer/mod.rs.html
@@ -624,59 +624,6 @@
 624
 625
 626
-627
-628
-629
-630
-631
-632
-633
-634
-635
-636
-637
-638
-639
-640
-641
-642
-643
-644
-645
-646
-647
-648
-649
-650
-651
-652
-653
-654
-655
-656
-657
-658
-659
-660
-661
-662
-663
-664
-665
-666
-667
-668
-669
-670
-671
-672
-673
-674
-675
-676
-677
-678
-679
 
//! Types for buffering envelopes.
 
 use std::error::Error;
@@ -789,7 +736,6 @@
     services: Services,
     has_capacity: Arc<AtomicBool>,
     sleep: Duration,
-    project_cache_ready: Arc<AtomicBool>,
 }
 
 /// The maximum amount of time between evaluations of dequeue conditions.
@@ -808,7 +754,6 @@
         memory_checker: MemoryChecker,
         global_config_rx: watch::Receiver<global_config::Status>,
         services: Services,
-        project_cache_ready: Arc<AtomicBool>,
     ) -> Option<Self> {
         config.spool_v2().then(|| Self {
             config,
@@ -817,7 +762,6 @@
             services,
             has_capacity: Arc::new(AtomicBool::new(true)),
             sleep: Duration::ZERO,
-            project_cache_ready,
         })
     }
 
@@ -851,12 +795,6 @@
             tokio::time::sleep(self.sleep).await;
         }
 
-        // In case the project cache is not ready, we defer popping to first try and handle incoming
-        // messages and only come back to this in case within the timeout no data was received.
-        while !self.project_cache_ready.load(Ordering::Relaxed) {
-            tokio::time::sleep(Duration::from_millis(10)).await;
-        }
-
         relay_statsd::metric!(
             counter(RelayCounters::BufferReadyToPop) += 1,
             status = "slept"
@@ -922,11 +860,7 @@
                     .pop()
                     .await?
                     .expect("Element disappeared despite exclusive excess");
-                // We assume that the project cache is now busy to process this envelope, so we flip
-                // the boolean flag, which will prioritize writes.
-                self.project_cache_ready.store(false, Ordering::SeqCst);
                 self.services.project_cache.send(DequeuedEnvelope(envelope));
-
                 self.sleep = Duration::ZERO; // try next pop immediately
             }
             Peek::NotReady(stack_key, envelope) => {
@@ -1131,7 +1065,6 @@
         watch::Sender<global_config::Status>,
         mpsc::UnboundedReceiver<ProjectCache>,
         mpsc::UnboundedReceiver<TrackOutcome>,
-        Arc<AtomicBool>,
     ) {
         let config = Arc::new(
             Config::from_json_value(serde_json::json!({
@@ -1147,7 +1080,6 @@
         let (global_tx, global_rx) = watch::channel(global_config::Status::Pending);
         let (project_cache, project_cache_rx) = Addr::custom();
         let (outcome_aggregator, outcome_aggregator_rx) = Addr::custom();
-        let project_cache_ready = Arc::new(AtomicBool::new(true));
         (
             EnvelopeBufferService::new(
                 config,
@@ -1158,20 +1090,18 @@
                     outcome_aggregator,
                     test_store: Addr::dummy(),
                 },
-                project_cache_ready.clone(),
             )
             .unwrap(),
             global_tx,
             project_cache_rx,
             outcome_aggregator_rx,
-            project_cache_ready,
         )
     }
 
     #[tokio::test]
     async fn capacity_is_updated() {
         tokio::time::pause();
-        let (service, _global_rx, _project_cache_tx, _, _) = buffer_service();
+        let (service, _global_rx, _project_cache_tx, _) = buffer_service();
 
         // Set capacity to false:
         service.has_capacity.store(false, Ordering::Relaxed);
@@ -1193,7 +1123,7 @@
     #[tokio::test]
     async fn pop_requires_global_config() {
         tokio::time::pause();
-        let (service, global_tx, project_cache_rx, _, _) = buffer_service();
+        let (service, global_tx, project_cache_rx, _) = buffer_service();
 
         let addr = service.start();
 
@@ -1242,7 +1172,6 @@
         )));
 
         let (project_cache, project_cache_rx) = Addr::custom();
-        let project_cache_ready = Arc::new(AtomicBool::new(true));
         let service = EnvelopeBufferService::new(
             config,
             memory_checker,
@@ -1252,7 +1181,6 @@
                 outcome_aggregator: Addr::dummy(),
                 test_store: Addr::dummy(),
             },
-            project_cache_ready,
         )
         .unwrap();
         let addr = service.start();
@@ -1288,7 +1216,6 @@
         let (global_tx, global_rx) = watch::channel(global_config::Status::Pending);
         let (project_cache, project_cache_rx) = Addr::custom();
         let (outcome_aggregator, mut outcome_aggregator_rx) = Addr::custom();
-        let project_cache_ready = Arc::new(AtomicBool::new(true));
         let service = EnvelopeBufferService::new(
             config,
             memory_checker,
@@ -1298,7 +1225,6 @@
                 outcome_aggregator,
                 test_store: Addr::dummy(),
             },
-            project_cache_ready,
         )
         .unwrap();
 
@@ -1323,37 +1249,5 @@
         assert_eq!(outcome.category, DataCategory::TransactionIndexed);
         assert_eq!(outcome.quantity, 1);
     }
-
-    #[tokio::test]
-    async fn output_is_throttled() {
-        tokio::time::pause();
-        let (service, global_tx, mut project_cache_rx, _, _) = buffer_service();
-        global_tx.send_replace(global_config::Status::Ready(Arc::new(
-            GlobalConfig::default(),
-        )));
-
-        let addr = service.start();
-
-        // Send five messages:
-        let envelope = new_envelope(false, "foo");
-        let project_key = envelope.meta().public_key();
-        for _ in 0..5 {
-            addr.send(EnvelopeBuffer::Push(envelope.clone()));
-        }
-        addr.send(EnvelopeBuffer::Ready(project_key));
-
-        tokio::time::sleep(Duration::from_millis(100)).await;
-
-        let mut messages = vec![];
-        project_cache_rx.recv_many(&mut messages, 100).await;
-
-        assert_eq!(
-            messages
-                .iter()
-                .filter(|message| matches!(message, ProjectCache::HandleDequeuedEnvelope(..)))
-                .count(),
-            1
-        );
-    }
 }
 
\ No newline at end of file diff --git a/src/relay_server/services/project_cache.rs.html b/src/relay_server/services/project_cache.rs.html index 63bee87cd8..13eedf758d 100644 --- a/src/relay_server/services/project_cache.rs.html +++ b/src/relay_server/services/project_cache.rs.html @@ -1768,19 +1768,8 @@ 1768 1769 1770 -1771 -1772 -1773 -1774 -1775 -1776 -1777 -1778 -1779 -1780
use std::collections::{BTreeMap, BTreeSet};
 use std::error::Error;
-use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::time::Duration;
 
@@ -2073,7 +2062,7 @@
     UpdateSpoolIndex(UpdateSpoolIndex),
     SpoolHealth(Sender<bool>),
     RefreshIndexCache(RefreshIndexCache),
-    HandleDequeuedEnvelope(Box<Envelope>),
+    HandleDequeuedEnvelope(Box<Envelope>, Sender<()>),
     UpdateProject(ProjectKey),
 }
 
@@ -2092,7 +2081,7 @@
             Self::UpdateSpoolIndex(_) => "UpdateSpoolIndex",
             Self::SpoolHealth(_) => "SpoolHealth",
             Self::RefreshIndexCache(_) => "RefreshIndexCache",
-            Self::HandleDequeuedEnvelope(_) => "HandleDequeuedEnvelope",
+            Self::HandleDequeuedEnvelope(_, _) => "HandleDequeuedEnvelope",
             Self::UpdateProject(_) => "UpdateProject",
         }
     }
@@ -2200,11 +2189,11 @@
 }
 
 impl FromMessage<DequeuedEnvelope> for ProjectCache {
-    type Response = relay_system::NoResponse;
+    type Response = relay_system::AsyncResponse<()>;
 
-    fn from_message(message: DequeuedEnvelope, _: ()) -> Self {
+    fn from_message(message: DequeuedEnvelope, sender: Sender<()>) -> Self {
         let DequeuedEnvelope(envelope) = message;
-        Self::HandleDequeuedEnvelope(envelope)
+        Self::HandleDequeuedEnvelope(envelope, sender)
     }
 }
 
@@ -2391,8 +2380,6 @@
     spool_v1: Option<SpoolV1>,
     /// Status of the global configuration, used to determine readiness for processing.
     global_config: GlobalConfigStatus,
-    /// Atomic boolean signaling whether the project cache is ready to accept a new envelope.
-    project_cache_ready: Arc<AtomicBool>,
 }
 
 #[derive(Debug)]
@@ -3088,7 +3075,7 @@
                     ProjectCache::RefreshIndexCache(message) => {
                         self.handle_refresh_index_cache(message)
                     }
-                    ProjectCache::HandleDequeuedEnvelope(message) => {
+                    ProjectCache::HandleDequeuedEnvelope(message, sender) => {
                         let envelope_buffer = self
                             .services
                             .envelope_buffer
@@ -3101,9 +3088,8 @@
                                 "Failed to handle popped envelope"
                             );
                         }
-
-                        // We mark the project cache as ready to accept new traffic.
-                        self.project_cache_ready.store(true, Ordering::SeqCst);
+                        // Return response to signal readiness for next envelope:
+                        sender.send(())
                     }
                     ProjectCache::UpdateProject(project) => self.handle_update_project(project),
                 }
@@ -3120,7 +3106,6 @@
     services: Services,
     global_config_rx: watch::Receiver<global_config::Status>,
     redis: Option<RedisPool>,
-    project_cache_ready: Arc<AtomicBool>,
 }
 
 impl ProjectCacheService {
@@ -3131,7 +3116,6 @@
         services: Services,
         global_config_rx: watch::Receiver<global_config::Status>,
         redis: Option<RedisPool>,
-        project_cache_ready: Arc<AtomicBool>,
     ) -> Self {
         Self {
             config,
@@ -3139,7 +3123,6 @@
             services,
             global_config_rx,
             redis,
-            project_cache_ready,
         }
     }
 }
@@ -3154,7 +3137,6 @@
             services,
             mut global_config_rx,
             redis,
-            project_cache_ready,
         } = self;
         let project_cache = services.project_cache.clone();
         let outcome_aggregator = services.outcome_aggregator.clone();
@@ -3236,7 +3218,6 @@
                 spool_v1_unspool_handle: SleepHandle::idle(),
                 spool_v1,
                 global_config,
-                project_cache_ready,
             };
 
             loop {
@@ -3431,7 +3412,6 @@
                     buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)),
                 }),
                 global_config: GlobalConfigStatus::Pending,
-                project_cache_ready: Arc::new(AtomicBool::new(true)),
             },
             buffer,
         )