From e5133037fee33783e67e245a369485f891334bfb Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 5 Sep 2024 09:15:42 +0200 Subject: [PATCH 01/11] feat(spool): Only dequeue if memory available --- relay-server/src/endpoints/common.rs | 4 +-- .../services/buffer/envelope_buffer/mod.rs | 35 ++++++++++++++----- relay-server/src/services/buffer/mod.rs | 4 +++ 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index bec9ebfd5c..289deb8b11 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -353,9 +353,7 @@ pub async fn handle_envelope( } // TODO(jjbayer): Remove this check once spool v1 is removed. - if state.memory_checker().check_memory().is_exceeded() { - // NOTE: Long-term, we should not reject the envelope here, but spool it to disk instead. - // This will be fixed with the new spool implementation. + if state.envelope_buffer().is_none() && state.memory_checker().check_memory().is_exceeded() { return Err(BadStoreRequest::QueueFailed); }; diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 433e46bf72..ec55a38aae 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -37,7 +37,10 @@ pub enum PolymorphicEnvelopeBuffer { InMemory(EnvelopeBuffer), /// An enveloper buffer that uses sqlite envelopes stacks. #[allow(dead_code)] - Sqlite(EnvelopeBuffer), + Sqlite { + buffer: EnvelopeBuffer, + memory_checker: MemoryChecker, + }, } impl PolymorphicEnvelopeBuffer { @@ -49,7 +52,10 @@ impl PolymorphicEnvelopeBuffer { ) -> Result { let buffer = if config.spool_envelopes_path().is_some() { let buffer = EnvelopeBuffer::::new(config).await?; - Self::Sqlite(buffer) + Self::Sqlite { + buffer, + memory_checker, + } } else { let buffer = EnvelopeBuffer::::new(memory_checker); Self::InMemory(buffer) @@ -62,14 +68,14 @@ impl PolymorphicEnvelopeBuffer { pub async fn initialize(&mut self) { match self { PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await, - PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await, + PolymorphicEnvelopeBuffer::Sqlite { buffer, .. } => buffer.initialize().await, } } /// Adds an envelope to the buffer. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { match self { - Self::Sqlite(buffer) => buffer.push(envelope).await, + Self::Sqlite { buffer, .. } => buffer.push(envelope).await, Self::InMemory(buffer) => buffer.push(envelope).await, }?; relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesWritten) += 1); @@ -79,7 +85,7 @@ impl PolymorphicEnvelopeBuffer { /// Returns a reference to the next-in-line envelope. pub async fn peek(&mut self) -> Result { match self { - Self::Sqlite(buffer) => buffer.peek().await, + Self::Sqlite { buffer, .. } => buffer.peek().await, Self::InMemory(buffer) => buffer.peek().await, } } @@ -87,7 +93,7 @@ impl PolymorphicEnvelopeBuffer { /// Pops the next-in-line envelope. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { let envelope = match self { - Self::Sqlite(buffer) => buffer.pop().await, + Self::Sqlite { buffer, .. } => buffer.pop().await, Self::InMemory(buffer) => buffer.pop().await, }?; relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesRead) += 1); @@ -100,7 +106,7 @@ impl PolymorphicEnvelopeBuffer { /// Returns `true` if at least one priority was changed. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { match self { - Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready), + Self::Sqlite { buffer, .. } => buffer.mark_ready(project, is_ready), Self::InMemory(buffer) => buffer.mark_ready(project, is_ready), } } @@ -112,7 +118,7 @@ impl PolymorphicEnvelopeBuffer { /// head-of-line blocking. pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) { match self { - Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair), + Self::Sqlite { buffer, .. } => buffer.mark_seen(project_key_pair), Self::InMemory(buffer) => buffer.mark_seen(project_key_pair), } } @@ -120,10 +126,21 @@ impl PolymorphicEnvelopeBuffer { /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s. pub fn has_capacity(&self) -> bool { match self { - Self::Sqlite(buffer) => buffer.has_capacity(), + Self::Sqlite { buffer, .. } => buffer.has_capacity(), Self::InMemory(buffer) => buffer.has_capacity(), } } + + /// Returns `true` when preconditions for unspooling are met. + /// + /// We should not pop from disk into memory when relay's overall memory capacity + /// has been reached. + pub fn should_flush(&self) -> bool { + match self { + Self::Sqlite { memory_checker, .. } => memory_checker.check_memory().has_capacity(), + Self::InMemory(_) => true, + } + } } /// Error that occurs while interacting with the envelope buffer. diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index cbe33a87f0..73a1737b3e 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -140,6 +140,10 @@ impl EnvelopeBufferService { &mut self, buffer: &mut PolymorphicEnvelopeBuffer, ) -> Result<(), EnvelopeBufferError> { + if !buffer.should_flush() { + return Ok(()); + } + relay_log::trace!("EnvelopeBufferService peek"); match buffer.peek().await? { Peek::Empty => { From c1924e11dda3a4b43b29808de087b765fd0b56cc Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 5 Sep 2024 09:25:08 +0200 Subject: [PATCH 02/11] clippy --- relay-server/src/services/buffer/envelope_buffer/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index ec55a38aae..59f9128d34 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -38,7 +38,9 @@ pub enum PolymorphicEnvelopeBuffer { /// An enveloper buffer that uses sqlite envelopes stacks. #[allow(dead_code)] Sqlite { + /// The contained buffer. buffer: EnvelopeBuffer, + /// A memory checker used to check whether envelopes may be unspooled from disk. memory_checker: MemoryChecker, }, } From c302bbcc6fdcb7e229a3cfca7696b8230c84a715 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 5 Sep 2024 13:54:27 +0200 Subject: [PATCH 03/11] ref: plain function --- .../services/buffer/envelope_buffer/mod.rs | 46 +++++++------------ relay-server/src/services/buffer/mod.rs | 23 ++++++++-- 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 59f9128d34..c16e9931d1 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -36,16 +36,18 @@ pub enum PolymorphicEnvelopeBuffer { /// An enveloper buffer that uses in-memory envelopes stacks. InMemory(EnvelopeBuffer), /// An enveloper buffer that uses sqlite envelopes stacks. - #[allow(dead_code)] - Sqlite { - /// The contained buffer. - buffer: EnvelopeBuffer, - /// A memory checker used to check whether envelopes may be unspooled from disk. - memory_checker: MemoryChecker, - }, + Sqlite(EnvelopeBuffer), } impl PolymorphicEnvelopeBuffer { + /// Returns true if the implementation stores envelopes on external storage (e.g. disk). + pub fn is_external(&self) -> bool { + match self { + PolymorphicEnvelopeBuffer::InMemory(_) => false, + PolymorphicEnvelopeBuffer::Sqlite(_) => true, + } + } + /// Creates either a memory-based or a disk-based envelope buffer, /// depending on the given configuration. pub async fn from_config( @@ -54,10 +56,7 @@ impl PolymorphicEnvelopeBuffer { ) -> Result { let buffer = if config.spool_envelopes_path().is_some() { let buffer = EnvelopeBuffer::::new(config).await?; - Self::Sqlite { - buffer, - memory_checker, - } + Self::Sqlite(buffer) } else { let buffer = EnvelopeBuffer::::new(memory_checker); Self::InMemory(buffer) @@ -70,14 +69,14 @@ impl PolymorphicEnvelopeBuffer { pub async fn initialize(&mut self) { match self { PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await, - PolymorphicEnvelopeBuffer::Sqlite { buffer, .. } => buffer.initialize().await, + PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await, } } /// Adds an envelope to the buffer. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { match self { - Self::Sqlite { buffer, .. } => buffer.push(envelope).await, + Self::Sqlite(buffer) => buffer.push(envelope).await, Self::InMemory(buffer) => buffer.push(envelope).await, }?; relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesWritten) += 1); @@ -87,7 +86,7 @@ impl PolymorphicEnvelopeBuffer { /// Returns a reference to the next-in-line envelope. pub async fn peek(&mut self) -> Result { match self { - Self::Sqlite { buffer, .. } => buffer.peek().await, + Self::Sqlite(buffer) => buffer.peek().await, Self::InMemory(buffer) => buffer.peek().await, } } @@ -95,7 +94,7 @@ impl PolymorphicEnvelopeBuffer { /// Pops the next-in-line envelope. pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { let envelope = match self { - Self::Sqlite { buffer, .. } => buffer.pop().await, + Self::Sqlite(buffer) => buffer.pop().await, Self::InMemory(buffer) => buffer.pop().await, }?; relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesRead) += 1); @@ -108,7 +107,7 @@ impl PolymorphicEnvelopeBuffer { /// Returns `true` if at least one priority was changed. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { match self { - Self::Sqlite { buffer, .. } => buffer.mark_ready(project, is_ready), + Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready), Self::InMemory(buffer) => buffer.mark_ready(project, is_ready), } } @@ -120,7 +119,7 @@ impl PolymorphicEnvelopeBuffer { /// head-of-line blocking. pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) { match self { - Self::Sqlite { buffer, .. } => buffer.mark_seen(project_key_pair), + Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair), Self::InMemory(buffer) => buffer.mark_seen(project_key_pair), } } @@ -128,21 +127,10 @@ impl PolymorphicEnvelopeBuffer { /// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s. pub fn has_capacity(&self) -> bool { match self { - Self::Sqlite { buffer, .. } => buffer.has_capacity(), + Self::Sqlite(buffer) => buffer.has_capacity(), Self::InMemory(buffer) => buffer.has_capacity(), } } - - /// Returns `true` when preconditions for unspooling are met. - /// - /// We should not pop from disk into memory when relay's overall memory capacity - /// has been reached. - pub fn should_flush(&self) -> bool { - match self { - Self::Sqlite { memory_checker, .. } => memory_checker.check_memory().has_capacity(), - Self::InMemory(_) => true, - } - } } /// Error that occurs while interacting with the envelope buffer. diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 73a1737b3e..b5a5a4a640 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -127,7 +127,7 @@ impl EnvelopeBufferService { } /// Wait for the configured amount of time and make sure the project cache is ready to receive. - async fn ready_to_pop(&mut self) -> Result<(), SendError> { + async fn sleep(&mut self) -> Result<(), SendError> { tokio::time::sleep(self.sleep).await; if let Some(project_cache_ready) = self.project_cache_ready.take() { project_cache_ready.await?; @@ -135,12 +135,29 @@ impl EnvelopeBufferService { Ok(()) } + /// Returns `true` when preconditions for unspooling are met. + /// + /// - We should not pop from disk into memory when relay's overall memory capacity + /// has been reached. + /// - We need a valid global config to unspool. + pub fn should_pop(&self, buffer: &PolymorphicEnvelopeBuffer) -> bool { + // 1 - check memory. In case of the in-memory implementation, dequeuing is beneficial because + // it could reduce memory pressure, but for the on-disk implementation, we should not dequeue + // from disk unless there's RAM available. + if buffer.is_external() && self.memory_checker.check_memory().is_exceeded() { + return false; + } + + // TODO: global config + true + } + /// Tries to pop an envelope for a ready project. async fn try_pop( &mut self, buffer: &mut PolymorphicEnvelopeBuffer, ) -> Result<(), EnvelopeBufferError> { - if !buffer.should_flush() { + if !self.should_pop(buffer) { return Ok(()); } @@ -254,7 +271,7 @@ impl Service for EnvelopeBufferService { // On the one hand, we might want to prioritize dequeing over enqueing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - Ok(()) = self.ready_to_pop() => { + Ok(()) = self.sleep() => { if let Err(e) = self.try_pop(&mut buffer).await { relay_log::error!( error = &e as &dyn std::error::Error, From 22300986935a522a3ca90c0c22a90d5010696436 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 5 Sep 2024 14:12:48 +0200 Subject: [PATCH 04/11] ref(server): Simplify global config --- relay-server/src/service.rs | 5 +- relay-server/src/services/global_config.rs | 67 ++++++++++------------ relay-server/src/services/project_cache.rs | 25 +++----- 3 files changed, 41 insertions(+), 56 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index a214b23837..41d6eb8cd6 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -177,7 +177,8 @@ impl ServiceState { .start(); let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start(); - let global_config = GlobalConfigService::new(config.clone(), upstream_relay.clone()); + let (global_config, global_config_rx) = + GlobalConfigService::new(config.clone(), upstream_relay.clone()); let global_config_handle = global_config.handle(); // The global config service must start before dependant services are // started. Messages like subscription requests to the global config @@ -256,13 +257,13 @@ impl ServiceState { project_cache: project_cache.clone(), test_store: test_store.clone(), upstream_relay: upstream_relay.clone(), - global_config: global_config.clone(), }; ProjectCacheService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache_services, + global_config_rx, redis_pools .as_ref() .map(|pools| pools.project_configs.clone()), diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index 4f438967a6..edf7e84ba8 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -104,14 +104,6 @@ impl UpstreamQuery for GetGlobalConfig { /// The message for requesting the most recent global config from [`GlobalConfigService`]. pub struct Get; -/// The message for receiving a watch that subscribes to the [`GlobalConfigService`]. -/// -/// The global config service must be up and running, else the subscription -/// fails. Subscribers should use the initial value when they get the watch -/// rather than only waiting for the watch to update, in case a global config -/// is only updated once, such as is the case with the static config file. -pub struct Subscribe; - /// An interface to get [`GlobalConfig`]s through [`GlobalConfigService`]. /// /// For a one-off update, [`GlobalConfigService`] responds to @@ -123,8 +115,6 @@ pub struct Subscribe; pub enum GlobalConfigManager { /// Returns the most recent global config. Get(relay_system::Sender), - /// Returns a [`watch::Receiver`] where global config updates will be sent to. - Subscribe(relay_system::Sender>), } impl Interface for GlobalConfigManager {} @@ -137,14 +127,6 @@ impl FromMessage for GlobalConfigManager { } } -impl FromMessage for GlobalConfigManager { - type Response = AsyncResponse>; - - fn from_message(_: Subscribe, sender: relay_system::Sender>) -> Self { - Self::Subscribe(sender) - } -} - /// Describes the current fetching status of the [`GlobalConfig`] from the upstream. #[derive(Debug, Clone, Default)] pub enum Status { @@ -228,21 +210,27 @@ pub struct GlobalConfigService { impl GlobalConfigService { /// Creates a new [`GlobalConfigService`]. - pub fn new(config: Arc, upstream: Addr) -> Self { + pub fn new( + config: Arc, + upstream: Addr, + ) -> (Self, watch::Receiver) { let (internal_tx, internal_rx) = mpsc::channel(1); - let (global_config_watch, _) = watch::channel(Status::Pending); - - Self { - config, - global_config_watch, - internal_tx, - internal_rx, - upstream, - fetch_handle: SleepHandle::idle(), - last_fetched: Instant::now(), - upstream_failure_interval: Duration::from_secs(35), - shutdown: false, - } + let (global_config_watch, rx) = watch::channel(Status::Pending); + + ( + Self { + config, + global_config_watch, + internal_tx, + internal_rx, + upstream, + fetch_handle: SleepHandle::idle(), + last_fetched: Instant::now(), + upstream_failure_interval: Duration::from_secs(35), + shutdown: false, + }, + rx, + ) } /// Creates a [`GlobalConfigHandle`] which can be used to retrieve the current state @@ -259,9 +247,6 @@ impl GlobalConfigService { GlobalConfigManager::Get(sender) => { sender.send(self.global_config_watch.borrow().clone()); } - GlobalConfigManager::Subscribe(sender) => { - sender.send(self.global_config_watch.subscribe()); - } } } @@ -440,7 +425,9 @@ mod tests { config.regenerate_credentials(false).unwrap(); let fetch_interval = config.global_config_fetch_interval(); - let service = GlobalConfigService::new(Arc::new(config), upstream).start(); + let service = GlobalConfigService::new(Arc::new(config), upstream) + .0 + .start(); assert!(service.send(Get).await.is_ok()); @@ -469,7 +456,9 @@ mod tests { config.regenerate_credentials(false).unwrap(); let fetch_interval = config.global_config_fetch_interval(); - let service = GlobalConfigService::new(Arc::new(config), upstream).start(); + let service = GlobalConfigService::new(Arc::new(config), upstream) + .0 + .start(); service.send(Get).await.unwrap(); tokio::time::sleep(fetch_interval * 2).await; @@ -494,7 +483,9 @@ mod tests { let fetch_interval = config.global_config_fetch_interval(); - let service = GlobalConfigService::new(Arc::new(config), upstream).start(); + let service = GlobalConfigService::new(Arc::new(config), upstream) + .0 + .start(); service.send(Get).await.unwrap(); tokio::time::sleep(fetch_interval * 2).await; diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 507833ce78..ef2f3f813f 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -5,6 +5,7 @@ use std::time::Duration; use crate::extractors::RequestMeta; use crate::services::buffer::{EnvelopeBuffer, EnvelopeBufferError}; +use crate::services::global_config; use crate::services::processor::{ EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics, }; @@ -19,12 +20,11 @@ use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, Sender, Service}; -use tokio::sync::mpsc; #[cfg(feature = "processing")] use tokio::sync::Semaphore; +use tokio::sync::{mpsc, watch}; use tokio::time::Instant; -use crate::services::global_config::{self, GlobalConfigManager, Subscribe}; use crate::services::metrics::{Aggregator, FlushBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::project::{Project, ProjectFetchState, ProjectSender, ProjectState}; @@ -585,7 +585,6 @@ pub struct Services { pub project_cache: Addr, pub test_store: Addr, pub upstream_relay: Addr, - pub global_config: Addr, } /// Main broker of the [`ProjectCacheService`]. @@ -1345,6 +1344,7 @@ pub struct ProjectCacheService { config: Arc, memory_checker: MemoryChecker, services: Services, + global_config_rx: watch::Receiver, redis: Option, } @@ -1354,12 +1354,14 @@ impl ProjectCacheService { config: Arc, memory_checker: MemoryChecker, services: Services, + global_config_rx: watch::Receiver, redis: Option, ) -> Self { Self { config, memory_checker, services, + global_config_rx, redis, } } @@ -1373,6 +1375,7 @@ impl Service for ProjectCacheService { config, memory_checker, services, + mut global_config_rx, redis, } = self; let project_cache = services.project_cache.clone(); @@ -1386,15 +1389,7 @@ impl Service for ProjectCacheService { // Channel for async project state responses back into the project cache. let (state_tx, mut state_rx) = mpsc::unbounded_channel(); - let Ok(mut subscription) = services.global_config.send(Subscribe).await else { - // TODO(iker): we accept this sub-optimal error handling. TBD - // the approach to deal with failures on the subscription - // mechanism. - relay_log::error!("failed to subscribe to GlobalConfigService"); - return; - }; - - let global_config = match subscription.borrow().clone() { + let global_config = match global_config_rx.borrow().clone() { global_config::Status::Ready(_) => { relay_log::info!("global config received"); GlobalConfigStatus::Ready @@ -1469,9 +1464,9 @@ impl Service for ProjectCacheService { tokio::select! { biased; - Ok(()) = subscription.changed() => { + Ok(()) = global_config_rx.changed() => { metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "update_global_config", { - match subscription.borrow().clone() { + match global_config_rx.borrow().clone() { global_config::Status::Ready(_) => broker.set_global_config_ready(), // The watch should only be updated if it gets a new value. // This would imply a logical bug. @@ -1591,7 +1586,6 @@ mod tests { let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {}); let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {}); - let (global_config, _) = mock_service("global_config", (), |&mut (), _| {}); Services { envelope_buffer: None, @@ -1601,7 +1595,6 @@ mod tests { outcome_aggregator, test_store, upstream_relay, - global_config, } } From 0c46222ab16dfa4fe858fbf40a0d61fafa9f4955 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 5 Sep 2024 14:43:00 +0200 Subject: [PATCH 05/11] feat: respect global config --- relay-server/src/service.rs | 1 + relay-server/src/services/buffer/mod.rs | 103 +++++++++++++++++++-- relay-server/src/services/global_config.rs | 3 +- 3 files changed, 96 insertions(+), 11 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 41d6eb8cd6..f8fbdfe48c 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -244,6 +244,7 @@ impl ServiceState { let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), + global_config_rx.clone(), project_cache.clone(), ) .map(|b| b.start_observable()); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index b5a5a4a640..601b91ae01 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -10,9 +10,11 @@ use relay_config::Config; use relay_system::Request; use relay_system::SendError; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; +use tokio::sync::watch; use crate::envelope::Envelope; use crate::services::buffer::envelope_buffer::Peek; +use crate::services::global_config; use crate::services::project_cache::DequeuedEnvelope; use crate::services::project_cache::ProjectCache; use crate::services::project_cache::UpdateProject; @@ -89,6 +91,7 @@ impl ObservableEnvelopeBuffer { pub struct EnvelopeBufferService { config: Arc, memory_checker: MemoryChecker, + global_config_rx: watch::Receiver, project_cache: Addr, has_capacity: Arc, sleep: Duration, @@ -105,11 +108,14 @@ impl EnvelopeBufferService { pub fn new( config: Arc, memory_checker: MemoryChecker, + global_config_rx: watch::Receiver, project_cache: Addr, ) -> Option { config.spool_v2().then(|| Self { config, memory_checker, + + global_config_rx, project_cache, has_capacity: Arc::new(AtomicBool::new(true)), sleep: Duration::ZERO, @@ -148,8 +154,7 @@ impl EnvelopeBufferService { return false; } - // TODO: global config - true + self.global_config_rx.borrow().is_ready() } /// Tries to pop an envelope for a ready project. @@ -298,14 +303,20 @@ impl Service for EnvelopeBufferService { mod tests { use std::time::Duration; + use relay_dynamic_config::GlobalConfig; + use tokio::sync::mpsc; + use uuid::Uuid; + use crate::testutils::new_envelope; use crate::MemoryStat; use super::*; - #[tokio::test] - async fn capacity_is_updated() { - tokio::time::pause(); + fn buffer_service() -> ( + EnvelopeBufferService, + watch::Sender, + mpsc::UnboundedReceiver, + ) { let config = Arc::new( Config::from_json_value(serde_json::json!({ "spool": { @@ -317,7 +328,20 @@ mod tests { .unwrap(), ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - let service = EnvelopeBufferService::new(config, memory_checker, Addr::dummy()).unwrap(); + let (global_tx, global_rx) = watch::channel(global_config::Status::Pending); + let (project_cache_addr, project_cache_rx) = Addr::custom(); + ( + EnvelopeBufferService::new(config, memory_checker, global_rx, project_cache_addr) + .unwrap(), + global_tx, + project_cache_rx, + ) + } + + #[tokio::test] + async fn capacity_is_updated() { + tokio::time::pause(); + let (service, _, _) = buffer_service(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); @@ -337,22 +361,81 @@ mod tests { } #[tokio::test] - async fn output_is_throttled() { + async fn pop_requires_global_config() { tokio::time::pause(); + let (service, global_tx, project_cache_rx) = buffer_service(); + + let addr = service.start(); + + // Send five messages: + let envelope = new_envelope(false, "foo"); + let project_key = envelope.meta().public_key(); + addr.send(EnvelopeBuffer::Push(envelope.clone())); + addr.send(EnvelopeBuffer::Ready(project_key)); + + tokio::time::sleep(Duration::from_millis(1000)).await; + + // Nothing was dequeued, global config not ready: + assert_eq!(project_cache_rx.len(), 0); + + global_tx.send_replace(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); + + tokio::time::sleep(Duration::from_millis(1000)).await; + + // Dequeued, global config ready: + assert_eq!(project_cache_rx.len(), 1); + } + + #[tokio::test] + async fn pop_requires_memory_capacity() { + tokio::time::pause(); + let config = Arc::new( Config::from_json_value(serde_json::json!({ "spool": { "envelopes": { - "version": "experimental" + "version": "experimental", + "path": std::env::temp_dir().join(Uuid::new_v4().to_string()), } + }, + "health": { + "max_memory_bytes": 0, } })) .unwrap(), ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - let (project_cache_addr, mut project_cache_rx) = Addr::custom(); + let (_, global_rx) = watch::channel(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); + + let (project_cache_addr, project_cache_rx) = Addr::custom(); let service = - EnvelopeBufferService::new(config, memory_checker, project_cache_addr).unwrap(); + EnvelopeBufferService::new(config, memory_checker, global_rx, project_cache_addr) + .unwrap(); + let addr = service.start(); + + // Send five messages: + let envelope = new_envelope(false, "foo"); + let project_key = envelope.meta().public_key(); + addr.send(EnvelopeBuffer::Push(envelope.clone())); + addr.send(EnvelopeBuffer::Ready(project_key)); + + tokio::time::sleep(Duration::from_millis(1000)).await; + + // Nothing was dequeued, memory not ready: + assert_eq!(project_cache_rx.len(), 0); + } + + #[tokio::test] + async fn output_is_throttled() { + tokio::time::pause(); + let (service, global_tx, mut project_cache_rx) = buffer_service(); + global_tx.send_replace(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); let addr = service.start(); diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index edf7e84ba8..52da9a21d2 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -144,7 +144,8 @@ pub enum Status { } impl Status { - fn is_ready(&self) -> bool { + /// Returns `true` if the global config is ready to be read. + pub fn is_ready(&self) -> bool { matches!(self, Self::Ready(_)) } } From cb422f58100bdd174cd73886af96fecce4a6b7f1 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 5 Sep 2024 14:44:51 +0200 Subject: [PATCH 06/11] lint --- relay-server/src/services/global_config.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index edf7e84ba8..346c8c9cd7 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -108,10 +108,7 @@ pub struct Get; /// /// For a one-off update, [`GlobalConfigService`] responds to /// [`GlobalConfigManager::Get`] messages with the latest instance of the -/// [`GlobalConfig`]. For continued updates, you can subscribe with -/// [`GlobalConfigManager::Subscribe`] to get a receiver back where up-to-date -/// instances will be sent to, while [`GlobalConfigService`] manages the update -/// frequency from upstream. +/// [`GlobalConfig`]. pub enum GlobalConfigManager { /// Returns the most recent global config. Get(relay_system::Sender), @@ -183,10 +180,6 @@ impl fmt::Debug for GlobalConfigHandle { } /// Service implementing the [`GlobalConfigManager`] interface. -/// -/// The service offers two alternatives to fetch the [`GlobalConfig`]: -/// responding to a [`Get`] message with the config for one-off requests, or -/// subscribing to updates with [`Subscribe`] to keep up-to-date. #[derive(Debug)] pub struct GlobalConfigService { config: Arc, From ae02a3033acca3f59301c1b55168fc137e99842e Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 5 Sep 2024 14:51:01 +0200 Subject: [PATCH 07/11] doc: Simplify comment --- relay-server/src/services/buffer/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 601b91ae01..f07470f9e0 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -147,9 +147,8 @@ impl EnvelopeBufferService { /// has been reached. /// - We need a valid global config to unspool. pub fn should_pop(&self, buffer: &PolymorphicEnvelopeBuffer) -> bool { - // 1 - check memory. In case of the in-memory implementation, dequeuing is beneficial because - // it could reduce memory pressure, but for the on-disk implementation, we should not dequeue - // from disk unless there's RAM available. + // We should not unspool from external storage if memory capacity has been reached. + // But if buffer storage is in memory, unspooling can reduce memory usage. if buffer.is_external() && self.memory_checker.check_memory().is_exceeded() { return false; } From d8ef58eb27c1942ca176bc724ffb12a3f38e4b76 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 6 Sep 2024 14:15:34 +0200 Subject: [PATCH 08/11] test: fix --- relay-server/src/services/buffer/mod.rs | 24 +++++++++++++++++++++--- tests/integration/test_query.py | 3 +++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index f07470f9e0..bb2ce3cab9 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -98,7 +98,11 @@ pub struct EnvelopeBufferService { project_cache_ready: Option>, } -const DEFAULT_SLEEP: Duration = Duration::from_millis(100); +/// The maximum amount of time between evaluations of dequeue conditions. +/// +/// Some condition checks are sync (`has_capacity`), so cannot be awaited. The sleep in cancelled +/// whenever a new message or a global config update comes in. +const DEFAULT_SLEEP: Duration = Duration::from_secs(1); impl EnvelopeBufferService { /// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config. @@ -150,10 +154,16 @@ impl EnvelopeBufferService { // We should not unspool from external storage if memory capacity has been reached. // But if buffer storage is in memory, unspooling can reduce memory usage. if buffer.is_external() && self.memory_checker.check_memory().is_exceeded() { + relay_log::trace!("Memory exceeded, cannot dequeue"); return false; } - self.global_config_rx.borrow().is_ready() + if !self.global_config_rx.borrow().is_ready() { + relay_log::trace!("Global config not ready"); + return false; + } + + true } /// Tries to pop an envelope for a ready project. @@ -162,6 +172,7 @@ impl EnvelopeBufferService { buffer: &mut PolymorphicEnvelopeBuffer, ) -> Result<(), EnvelopeBufferError> { if !self.should_pop(buffer) { + self.sleep = DEFAULT_SLEEP; return Ok(()); } @@ -251,6 +262,7 @@ impl Service for EnvelopeBufferService { fn spawn_handler(mut self, mut rx: Receiver) { let config = self.config.clone(); let memory_checker = self.memory_checker.clone(); + let mut global_config_rx = self.global_config_rx.clone(); tokio::spawn(async move { let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; @@ -267,8 +279,10 @@ impl Service for EnvelopeBufferService { buffer.initialize().await; relay_log::info!("EnvelopeBufferService start"); + let mut iteration = 0; loop { - relay_log::trace!("EnvelopeBufferService loop"); + iteration += 1; + relay_log::trace!("EnvelopeBufferService loop iteration {iteration}"); tokio::select! { // NOTE: we do not select a bias here. @@ -286,6 +300,10 @@ impl Service for EnvelopeBufferService { Some(message) = rx.recv() => { self.handle_message(&mut buffer, message).await; } + _ = global_config_rx.changed() => { + relay_log::trace!("EnvelopeBufferService received global config"); + self.sleep = Duration::ZERO; // Try to pop + } else => break, } diff --git a/tests/integration/test_query.py b/tests/integration/test_query.py index f345904364..0da23c2b59 100644 --- a/tests/integration/test_query.py +++ b/tests/integration/test_query.py @@ -120,6 +120,9 @@ def test_query_retry(failure_type, mini_sentry, relay): @mini_sentry.app.endpoint("get_project_config") def get_project_config(): + if flask_request.json.get("global") is True: + return original_endpoint() + nonlocal retry_count retry_count += 1 print("RETRY", retry_count) From 1a1fe27381c302a7fa7dca0f1d4259bfa82372e6 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 6 Sep 2024 14:36:19 +0200 Subject: [PATCH 09/11] simplify --- relay-server/src/services/buffer/mod.rs | 40 ++++++++++++------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index bb2ce3cab9..838d45377a 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -137,7 +137,11 @@ impl EnvelopeBufferService { } /// Wait for the configured amount of time and make sure the project cache is ready to receive. - async fn sleep(&mut self) -> Result<(), SendError> { + async fn ready_to_pop( + &mut self, + buffer: &mut PolymorphicEnvelopeBuffer, + ) -> Result<(), SendError> { + self.system_ready(buffer).await; tokio::time::sleep(self.sleep).await; if let Some(project_cache_ready) = self.project_cache_ready.take() { project_cache_ready.await?; @@ -145,25 +149,24 @@ impl EnvelopeBufferService { Ok(()) } - /// Returns `true` when preconditions for unspooling are met. + /// Waits until preconditions for unspooling are met. /// /// - We should not pop from disk into memory when relay's overall memory capacity /// has been reached. /// - We need a valid global config to unspool. - pub fn should_pop(&self, buffer: &PolymorphicEnvelopeBuffer) -> bool { - // We should not unspool from external storage if memory capacity has been reached. - // But if buffer storage is in memory, unspooling can reduce memory usage. - if buffer.is_external() && self.memory_checker.check_memory().is_exceeded() { - relay_log::trace!("Memory exceeded, cannot dequeue"); - return false; - } - - if !self.global_config_rx.borrow().is_ready() { - relay_log::trace!("Global config not ready"); - return false; + async fn system_ready(&self, buffer: &PolymorphicEnvelopeBuffer) { + loop { + // We should not unspool from external storage if memory capacity has been reached. + // But if buffer storage is in memory, unspooling can reduce memory usage. + let memory_ready = + !buffer.is_external() || self.memory_checker.check_memory().has_capacity(); + let global_config_ready = self.global_config_rx.borrow().is_ready(); + + if memory_ready && global_config_ready { + return; + } + tokio::time::sleep(DEFAULT_SLEEP).await; } - - true } /// Tries to pop an envelope for a ready project. @@ -171,11 +174,6 @@ impl EnvelopeBufferService { &mut self, buffer: &mut PolymorphicEnvelopeBuffer, ) -> Result<(), EnvelopeBufferError> { - if !self.should_pop(buffer) { - self.sleep = DEFAULT_SLEEP; - return Ok(()); - } - relay_log::trace!("EnvelopeBufferService peek"); match buffer.peek().await? { Peek::Empty => { @@ -289,7 +287,7 @@ impl Service for EnvelopeBufferService { // On the one hand, we might want to prioritize dequeing over enqueing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - Ok(()) = self.sleep() => { + Ok(()) = self.ready_to_pop(&mut buffer) => { if let Err(e) = self.try_pop(&mut buffer).await { relay_log::error!( error = &e as &dyn std::error::Error, From 53521f8d453a362cafdb583e987b935869d7d06e Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Sat, 7 Sep 2024 09:39:39 +0200 Subject: [PATCH 10/11] Fix test --- relay-server/src/services/buffer/mod.rs | 2 +- relay-server/src/services/processor/event.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 838d45377a..45e568eb44 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -356,7 +356,7 @@ mod tests { #[tokio::test] async fn capacity_is_updated() { tokio::time::pause(); - let (service, _, _) = buffer_service(); + let (service, _1, _2) = buffer_service(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index eee0c3ff79..54fec8d519 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -872,8 +872,7 @@ mod tests { let event = Annotated::new(Event { release: Annotated::new( - String::from("���7��#1G����7��#1G����7��#1G����7��#1G����7��#") - .into(), + String::from("���7��#1G����7��#1G����7��#1G����7��#1G����7��#").into(), ), ..Default::default() }); From ad2705b92317bc3a2fbbc64334804c04ba7dc8eb Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Mon, 9 Sep 2024 12:22:39 +0200 Subject: [PATCH 11/11] lint --- relay-server/src/services/buffer/mod.rs | 2 +- relay-server/src/services/processor/event.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index faa616d52e..1b625e1b21 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -358,7 +358,7 @@ mod tests { #[tokio::test] async fn capacity_is_updated() { tokio::time::pause(); - let (service, _1, _2) = buffer_service(); + let (service, _global_rx, _project_cache_tx) = buffer_service(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 54fec8d519..eee0c3ff79 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -872,7 +872,8 @@ mod tests { let event = Annotated::new(Event { release: Annotated::new( - String::from("���7��#1G����7��#1G����7��#1G����7��#1G����7��#").into(), + String::from("���7��#1G����7��#1G����7��#1G����7��#1G����7��#") + .into(), ), ..Default::default() });