Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Sep 20, 2024
1 parent ed9cc63 commit 8df6bbd
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
14 changes: 14 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,11 @@ fn spool_disk_usage_refresh_frequency_ms() -> u64 {
100
}

/// Default bounded buffer size for handling backpressure.
fn spool_max_backpressure_envelopes() -> usize {
500
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand Down Expand Up @@ -955,6 +960,9 @@ pub struct EnvelopeSpool {
/// internal page stats.
#[serde(default = "spool_disk_usage_refresh_frequency_ms")]
disk_usage_refresh_frequency_ms: u64,
/// The amount of envelopes that can be put in the bounded buffer.
#[serde(default = "spool_max_backpressure_envelopes")]
max_backpressure_envelopes: usize,
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
Expand Down Expand Up @@ -991,6 +999,7 @@ impl Default for EnvelopeSpool {
max_batches: spool_envelopes_stack_max_batches(),
max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
version: EnvelopeSpoolVersion::default(),
}
}
Expand Down Expand Up @@ -2212,6 +2221,11 @@ impl Config {
Duration::from_millis(self.values.spool.envelopes.disk_usage_refresh_frequency_ms)
}

/// Returns the maximum number of envelopes that can be put in the bounded buffer.
pub fn spool_max_backpressure_envelopes(&self) -> usize {
self.values.spool.envelopes.max_backpressure_envelopes
}

/// Returns the maximum size of an event payload in bytes.
pub fn max_event_size(&self) -> usize {
self.values.limits.max_event_size.as_bytes()
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl ServiceState {
)
.spawn_handler(processor_rx);

let (envelopes_tx, envelopes_rx) = mpsc::channel(500);
let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());
let envelope_buffer = EnvelopeBufferService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Expand Down
22 changes: 18 additions & 4 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl EnvelopeBufferService {
async fn try_pop<'a>(
config: &Config,
buffer: &mut PolymorphicEnvelopeBuffer,
services: Services,
services: &Services,
envelopes_tx_permit: Permit<'a, DequeuedEnvelope>,
) -> Result<Duration, EnvelopeBufferError> {
relay_log::trace!("EnvelopeBufferService: peeking the buffer");
Expand Down Expand Up @@ -291,7 +291,7 @@ impl EnvelopeBufferService {
envelope.meta().start_time().elapsed() > config.spool_envelopes_max_age()
}

fn drop_expired(envelope: Box<Envelope>, services: Services) {
fn drop_expired(envelope: Box<Envelope>, services: &Services) {
let mut managed_envelope = ManagedEnvelope::new(
envelope,
services.outcome_aggregator.clone(),
Expand Down Expand Up @@ -375,6 +375,7 @@ impl Service for EnvelopeBufferService {
let memory_checker = self.memory_checker.clone();
let mut global_config_rx = self.global_config_rx.clone();
let services = self.services.clone();

tokio::spawn(async move {
let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await;

Expand Down Expand Up @@ -405,7 +406,7 @@ impl Service for EnvelopeBufferService {
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
Some(permit) = self.ready_to_pop(&buffer) => {
match Self::try_pop(&config, &mut buffer, services.clone(), permit).await {
match Self::try_pop(&config, &mut buffer, &services, permit).await {
Ok(new_sleep) => {
sleep = new_sleep;
}
Expand All @@ -430,7 +431,7 @@ impl Service for EnvelopeBufferService {
}
_ = global_config_rx.changed() => {
relay_log::trace!("EnvelopeBufferService: received global config");
sleep = Duration::ZERO; // Try to pop
sleep = Duration::ZERO;
}
else => break,
}
Expand Down Expand Up @@ -737,5 +738,18 @@ mod tests {
.count(),
5
);

tokio::time::sleep(Duration::from_millis(100)).await;

let mut messages = vec![];
envelopes_rx.recv_many(&mut messages, 100).await;

assert_eq!(
messages
.iter()
.filter(|message| matches!(message, DequeuedEnvelope(..)))
.count(),
5
);
}
}
1 change: 1 addition & 0 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ pub struct SpoolHealth;
pub struct RefreshIndexCache(pub HashSet<QueueKey>);

/// Handle an envelope that was popped from the envelope buffer.
#[derive(Debug)]
pub struct DequeuedEnvelope(pub Box<Envelope>);

/// A request to update a project, typically sent by the envelope buffer.
Expand Down

0 comments on commit 8df6bbd

Please sign in to comment.