diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 73b486199d..6bdaea3635 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -912,6 +912,11 @@ fn spool_disk_usage_refresh_frequency_ms() -> u64 { 100 } +/// Default bounded buffer size for handling backpressure. +fn spool_max_backpressure_envelopes() -> usize { + 500 +} + /// Persistent buffering configuration for incoming envelopes. #[derive(Debug, Serialize, Deserialize)] pub struct EnvelopeSpool { @@ -955,6 +960,9 @@ pub struct EnvelopeSpool { /// internal page stats. #[serde(default = "spool_disk_usage_refresh_frequency_ms")] disk_usage_refresh_frequency_ms: u64, + /// The amount of envelopes that can be put in the bounded buffer. + #[serde(default = "spool_max_backpressure_envelopes")] + max_backpressure_envelopes: usize, /// Version of the spooler. #[serde(default)] version: EnvelopeSpoolVersion, @@ -991,6 +999,7 @@ impl Default for EnvelopeSpool { max_batches: spool_envelopes_stack_max_batches(), max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(), disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(), + max_backpressure_envelopes: spool_max_backpressure_envelopes(), version: EnvelopeSpoolVersion::default(), } } @@ -2212,6 +2221,11 @@ impl Config { Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms) } + /// Returns the maximum number of envelopes that can be put in the bounded buffer. + pub fn spool_max_backpressure_envelopes(&self) -> usize { + self.values.spool.envelopes.max_backpressure_envelopes + } + /// Returns the maximum size of an event payload in bytes. pub fn max_event_size(&self) -> usize { self.values.limits.max_event_size.as_bytes() diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 5a1159726c..28364dde65 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -242,7 +242,7 @@ impl ServiceState { ) .spawn_handler(processor_rx); - let (envelopes_tx, envelopes_rx) = mpsc::channel(500); + let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes()); let envelope_buffer = EnvelopeBufferService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 667dea2b06..4b8f380439 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -211,7 +211,7 @@ impl EnvelopeBufferService { async fn try_pop<'a>( config: &Config, buffer: &mut PolymorphicEnvelopeBuffer, - services: Services, + services: &Services, envelopes_tx_permit: Permit<'a, DequeuedEnvelope>, ) -> Result { relay_log::trace!("EnvelopeBufferService: peeking the buffer"); @@ -291,7 +291,7 @@ impl EnvelopeBufferService { envelope.meta().start_time().elapsed() > config.spool_envelopes_max_age() } - fn drop_expired(envelope: Box, services: Services) { + fn drop_expired(envelope: Box, services: &Services) { let mut managed_envelope = ManagedEnvelope::new( envelope, services.outcome_aggregator.clone(), @@ -375,6 +375,7 @@ impl Service for EnvelopeBufferService { let memory_checker = self.memory_checker.clone(); let mut global_config_rx = self.global_config_rx.clone(); let services = self.services.clone(); + tokio::spawn(async move { let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; @@ -405,7 +406,7 @@ impl Service for EnvelopeBufferService { // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. Some(permit) = self.ready_to_pop(&buffer) => { - match Self::try_pop(&config, &mut buffer, services.clone(), permit).await { + match Self::try_pop(&config, &mut buffer, &services, permit).await { Ok(new_sleep) => { sleep = new_sleep; } @@ -430,7 +431,7 @@ impl Service for EnvelopeBufferService { } _ = global_config_rx.changed() => { relay_log::trace!("EnvelopeBufferService: received global config"); - sleep = Duration::ZERO; // Try to pop + sleep = Duration::ZERO; } else => break, } @@ -737,5 +738,18 @@ mod tests { .count(), 5 ); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut messages = vec![]; + envelopes_rx.recv_many(&mut messages, 100).await; + + assert_eq!( + messages + .iter() + .filter(|message| matches!(message, DequeuedEnvelope(..))) + .count(), + 5 + ); } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index d2b02197f9..690671a434 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -255,6 +255,7 @@ pub struct SpoolHealth; pub struct RefreshIndexCache(pub HashSet); /// Handle an envelope that was popped from the envelope buffer. +#[derive(Debug)] pub struct DequeuedEnvelope(pub Box); /// A request to update a project, typically sent by the envelope buffer.