diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bd86a9957..c7b8e74ffd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,8 @@ **Internal**: -- Add `EnvelopeStack` and `SQLiteEnvelopeStack` to manage envelopes on disk. ([#3855](https://github.com/getsentry/relay/pull/3855)) -- Add `client_sample_rate` to spans, pulled from the trace context ([#3872](https://github.com/getsentry/relay/pull/3872)). - +- Add experimental support for V2 envelope buffering. ([#3855](https://github.com/getsentry/relay/pull/3855), [#3863](https://github.com/getsentry/relay/pull/3863)) +- Add `client_sample_rate` to spans, pulled from the trace context. ([#3872](https://github.com/getsentry/relay/pull/3872)) ## 24.7.1 diff --git a/Cargo.lock b/Cargo.lock index 1d06f56e1f..217027fb9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4100,6 +4100,7 @@ dependencies = [ "multer", "once_cell", "pin-project-lite", + "priority-queue", "rand", "rayon", "regex", diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 25216653f9..108799e195 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -842,6 +842,20 @@ fn spool_envelopes_unspool_interval() -> u64 { 100 } +/// Default batch size for the stack. +fn spool_envelopes_stack_disk_batch_size() -> usize { + 200 +} + +/// Default maximum number of batches for the stack. +fn spool_envelopes_stack_max_batches() -> usize { + 2 +} + +fn spool_envelopes_max_envelope_delay_secs() -> u64 { + 24 * 60 * 60 +} + /// Persistent buffering configuration for incoming envelopes. #[derive(Debug, Serialize, Deserialize)] pub struct EnvelopeSpool { @@ -868,6 +882,40 @@ pub struct EnvelopeSpool { /// The interval in milliseconds to trigger unspool. #[serde(default = "spool_envelopes_unspool_interval")] unspool_interval: u64, + /// Number of elements of the envelope stack that are flushed to disk. + #[serde(default = "spool_envelopes_stack_disk_batch_size")] + disk_batch_size: usize, + /// Number of batches of size [`Self::disk_batch_size`] that need to be accumulated before + /// flushing one batch to disk. + #[serde(default = "spool_envelopes_stack_max_batches")] + max_batches: usize, + /// Maximum time between receiving the envelope and processing it. + /// + /// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded), + /// they are dropped. Defaults to 24h. + #[serde(default = "spool_envelopes_max_envelope_delay_secs")] + max_envelope_delay_secs: u64, + /// Version of the spooler. + #[serde(default)] + version: EnvelopeSpoolVersion, +} + +/// Version of the envelope buffering mechanism. +#[derive(Debug, Default, Deserialize, Serialize)] +pub enum EnvelopeSpoolVersion { + /// Use the spooler service, which only buffers envelopes for unloaded projects and + /// switches between an in-memory mode and a disk mode on-demand. + /// + /// This mode will be removed soon. + #[default] + #[serde(rename = "1")] + V1, + /// Use the envelope buffer, through which all envelopes pass before getting unspooled. + /// Can be either disk based or memory based. + /// + /// This mode has not yet been stress-tested, do not use in production environments. + #[serde(rename = "experimental")] + V2, } impl Default for EnvelopeSpool { @@ -879,6 +927,10 @@ impl Default for EnvelopeSpool { max_disk_size: spool_envelopes_max_disk_size(), max_memory_size: spool_envelopes_max_memory_size(), unspool_interval: spool_envelopes_unspool_interval(), // 100ms + disk_batch_size: spool_envelopes_stack_disk_batch_size(), + max_batches: spool_envelopes_stack_max_batches(), + max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(), + version: EnvelopeSpoolVersion::V2, } } } @@ -2077,6 +2129,31 @@ impl Config { self.values.spool.envelopes.max_memory_size.as_bytes() } + /// Number of batches of size `stack_disk_batch_size` that need to be accumulated before + /// flushing one batch to disk. + pub fn spool_envelopes_stack_disk_batch_size(&self) -> usize { + self.values.spool.envelopes.disk_batch_size + } + + /// Number of batches of size `stack_disk_batch_size` that need to be accumulated before + /// flushing one batch to disk. + pub fn spool_envelopes_stack_max_batches(&self) -> usize { + self.values.spool.envelopes.max_batches + } + + /// Returns `true` if version 2 of the spooling mechanism is used. + pub fn spool_v2(&self) -> bool { + matches!( + self.values.spool.envelopes.version, + EnvelopeSpoolVersion::V2 + ) + } + + /// Returns the time after which we drop envelopes as a [`Duration`] object. + pub fn spool_envelopes_max_age(&self) -> Duration { + Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs) + } + /// Returns the maximum size of an event payload in bytes. pub fn max_event_size(&self) -> usize { self.values.limits.max_event_size.as_bytes() diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 5cb868b634..7d67b4183e 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -64,6 +64,7 @@ minidump = { workspace = true, optional = true } multer = { workspace = true } once_cell = { workspace = true } pin-project-lite = { workspace = true } +priority-queue = { workspace = true } rand = { workspace = true } rayon = { workspace = true } regex = { workspace = true } diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index 381f48936d..ac6ff48257 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -1,14 +1,17 @@ use bytes::Bytes; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use relay_config::Config; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; use sqlx::{Pool, Sqlite}; use std::path::PathBuf; -use std::time::Duration; +use std::time::{Duration, Instant}; use tempfile::TempDir; use tokio::runtime::Runtime; use relay_base_schema::project::ProjectKey; -use relay_server::{Envelope, EnvelopeStack, SQLiteEnvelopeStack}; +use relay_server::{ + Envelope, EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore, +}; fn setup_db(path: &PathBuf) -> Pool { let options = SqliteConnectOptions::new() @@ -37,6 +40,11 @@ async fn reset_db(db: Pool) { } fn mock_envelope(size: &str) -> Box { + let project_key = "e12d836b15bb49d7bbf99e64295d995b"; + mock_envelope_with_project_key(&ProjectKey::parse(project_key).unwrap(), size) +} + +fn mock_envelope_with_project_key(project_key: &ProjectKey, size: &str) -> Box { let payload = match size { "small" => "small_payload".to_string(), "medium" => "medium_payload".repeat(100), @@ -47,20 +55,24 @@ fn mock_envelope(size: &str) -> Box { let bytes = Bytes::from(format!( "\ - {{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\",\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}}\n\ + {{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\",\"dsn\":\"https://{}:@sentry.io/42\"}}\n\ {{\"type\":\"attachment\"}}\n\ {}\n\ ", + project_key, payload )); - Envelope::parse_bytes(bytes).unwrap() + let mut envelope = Envelope::parse_bytes(bytes).unwrap(); + envelope.set_start_time(Instant::now()); + envelope } fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { let temp_dir = TempDir::new().unwrap(); let db_path = temp_dir.path().join("test.db"); let db = setup_db(&db_path); + let envelope_store = SqliteEnvelopeStore::new(db.clone()); let runtime = Runtime::new().unwrap(); @@ -83,8 +95,8 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { reset_db(db.clone()).await; }); - let stack = SQLiteEnvelopeStack::new( - db.clone(), + let stack = SqliteEnvelopeStack::new( + envelope_store.clone(), disk_batch_size, 2, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), @@ -119,8 +131,8 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { runtime.block_on(async { reset_db(db.clone()).await; - let mut stack = SQLiteEnvelopeStack::new( - db.clone(), + let mut stack = SqliteEnvelopeStack::new( + envelope_store.clone(), disk_batch_size, 2, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), @@ -159,8 +171,8 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { reset_db(db.clone()).await; }); - let stack = SQLiteEnvelopeStack::new( - db.clone(), + let stack = SqliteEnvelopeStack::new( + envelope_store.clone(), disk_batch_size, 2, ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), @@ -199,5 +211,90 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, benchmark_sqlite_envelope_stack); -criterion_main!(benches); +fn benchmark_envelope_buffer(c: &mut Criterion) { + use rand::seq::SliceRandom; + let mut group = c.benchmark_group("envelope_buffer"); + group.sample_size(10); + + let runtime = Runtime::new().unwrap(); + + let num_projects = 100000; + let envelopes_per_project = 10; + + group.throughput(Throughput::Elements( + num_projects * envelopes_per_project as u64, + )); + + group.bench_function("push_only", |b| { + b.iter_with_setup( + || { + let project_keys: Vec<_> = (0..num_projects) + .map(|i| ProjectKey::parse(&format!("{:#032x}", i)).unwrap()) + .collect(); + + let mut envelopes = vec![]; + for project_key in &project_keys { + for _ in 0..envelopes_per_project { + envelopes.push(mock_envelope_with_project_key(project_key, "small")) + } + } + + envelopes.shuffle(&mut rand::thread_rng()); + + envelopes + }, + |envelopes| { + runtime.block_on(async { + let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default()); + for envelope in envelopes.into_iter() { + buffer.push(envelope).await.unwrap(); + } + }) + }, + ); + }); + + group.bench_function("push_pop", |b| { + b.iter_with_setup( + || { + let project_keys: Vec<_> = (0..num_projects) + .map(|i| ProjectKey::parse(&format!("{:#032x}", i)).unwrap()) + .collect(); + + let mut envelopes = vec![]; + for project_key in &project_keys { + for _ in 0..envelopes_per_project { + envelopes.push(mock_envelope_with_project_key(project_key, "big")) + } + } + + envelopes.shuffle(&mut rand::thread_rng()); + + envelopes + }, + |envelopes| { + runtime.block_on(async { + let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default()); + let n = envelopes.len(); + for envelope in envelopes.into_iter() { + let public_key = envelope.meta().public_key(); + buffer.push(envelope).await.unwrap(); + // Mark as ready: + buffer.mark_ready(&public_key, true); + } + for _ in 0..n { + let envelope = buffer.pop().await.unwrap().unwrap(); + // Send back to end of queue to get worse-case behavior: + buffer.mark_ready(&envelope.meta().public_key(), false); + } + }) + }, + ); + }); + + group.finish(); +} + +criterion_group!(sqlite, benchmark_sqlite_envelope_stack); +criterion_group!(buffer, benchmark_envelope_buffer); +criterion_main!(sqlite, buffer); diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 798867f6e7..3309b7a0e4 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -305,7 +305,21 @@ fn queue_envelope( ); envelope.scope(scoping); - state.project_cache().send(ValidateEnvelope::new(envelope)); + match state.envelope_buffer() { + Some(buffer) => { + // NOTE: This assumes that a `prefetch` has already been scheduled for both the + // envelope's projects. See `handle_check_envelope`. + relay_log::trace!("Pushing envelope to V2 buffer"); + + // TODO: Sync-check whether the buffer has capacity. + // Otherwise return `QueueFailed`. + buffer.defer_push(envelope); + } + None => { + relay_log::trace!("Sending envelope to project cache for V1 buffer"); + state.project_cache().send(ValidateEnvelope::new(envelope)); + } + } } // The entire envelope is taken for a split above, and it's empty at this point, we can just // accept it without additional checks. @@ -333,6 +347,7 @@ pub async fn handle_envelope( ) } + // TODO(jjbayer): Move this check to spool impl if state.memory_checker().check_memory().is_exceeded() { // NOTE: Long-term, we should not reject the envelope here, but spool it to disk instead. // This will be fixed with the new spool implementation. diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index b95366d3d1..adcb2d0073 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -1238,6 +1238,13 @@ impl Envelope { self.dsc().map(|dsc| dsc.public_key) } + /// Returns the time at which the envelope was received at this Relay. + /// + /// This is the date time equivalent to [`start_time`](Self::start_time). + pub fn received_at(&self) -> DateTime { + relay_common::time::instant_to_date_time(self.meta().start_time()) + } + /// Sets the event id on the envelope. pub fn set_event_id(&mut self, event_id: EventId) { self.headers.event_id = Some(event_id); diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 4b1c1670ae..66ba5bc6d6 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -266,11 +266,11 @@ mod services; mod statsd; mod utils; +pub use self::envelope::Envelope; // pub for benchmarks +pub use self::services::buffer::{ + EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore, +}; // pub for benchmarks pub use self::services::spooler::spool_utils; -// Public just for benchmarks. -pub use self::envelope::Envelope; -pub use self::services::spooler::envelope_stack::sqlite::SQLiteEnvelopeStack; -pub use self::services::spooler::envelope_stack::EnvelopeStack; #[cfg(test)] mod testutils; diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 4f40ce73d0..2925ee92b8 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::time::Duration; use crate::metrics::{MetricOutcomes, MetricStats}; +use crate::services::buffer::GuardedEnvelopeBuffer; use crate::services::stats::RelayStats; use anyhow::{Context, Result}; use axum::extract::FromRequestParts; @@ -138,6 +139,7 @@ fn create_store_pool(config: &Config) -> Result { struct StateInner { config: Arc, memory_checker: MemoryChecker, + envelope_buffer: Option>, registry: Registry, } @@ -255,9 +257,11 @@ impl ServiceState { upstream_relay.clone(), global_config.clone(), ); + let envelope_buffer = GuardedEnvelopeBuffer::from_config(&config).map(Arc::new); ProjectCacheService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), + envelope_buffer.clone(), project_cache_services, metric_outcomes, redis_pool.clone(), @@ -298,7 +302,8 @@ impl ServiceState { let state = StateInner { config: config.clone(), - memory_checker: MemoryChecker::new(memory_stat, config), + memory_checker: MemoryChecker::new(memory_stat, config.clone()), + envelope_buffer, registry, }; @@ -319,6 +324,13 @@ impl ServiceState { &self.inner.memory_checker } + /// Returns the V2 envelope buffer, if present. + /// + /// Clones the inner Arc. + pub fn envelope_buffer(&self) -> Option> { + self.inner.envelope_buffer.clone() + } + /// Returns the address of the [`ProjectCache`] service. pub fn project_cache(&self) -> &Addr { &self.inner.registry.project_cache diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs new file mode 100644 index 0000000000..969d9a3565 --- /dev/null +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -0,0 +1,597 @@ +use std::cmp::Ordering; +use std::collections::BTreeSet; +use std::convert::Infallible; +use std::time::Instant; + +use relay_base_schema::project::ProjectKey; +use relay_config::Config; +use stack_key::StackKey; + +use crate::envelope::Envelope; +use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; +use crate::services::buffer::envelope_stack::{EnvelopeStack, StackProvider}; +use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError; +use crate::services::buffer::stack_provider::memory::MemoryStackProvider; +use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; +use crate::statsd::{RelayCounters, RelayGauges}; + +/// Polymorphic envelope buffering interface. +/// +/// The underlying buffer can either be disk-based or memory-based, +/// depending on the given configuration. +/// +/// NOTE: This is implemented as an enum because a trait object with async methods would not be +/// object safe. +#[derive(Debug)] +#[allow(private_interfaces)] +pub enum PolymorphicEnvelopeBuffer { + /// An enveloper buffer that uses in-memory envelopes stacks. + InMemory(EnvelopeBuffer), + /// An enveloper buffer that uses sqlite envelopes stacks. + #[allow(dead_code)] + Sqlite(EnvelopeBuffer), +} + +impl PolymorphicEnvelopeBuffer { + /// Creates either a memory-based or a disk-based envelope buffer, + /// depending on the given configuration. + pub fn from_config(config: &Config) -> Self { + if config.spool_envelopes_path().is_some() { + panic!("Disk backend not yet supported for spool V2"); + } + Self::InMemory(EnvelopeBuffer::::new()) + } + + /// Adds an envelope to the buffer. + pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { + match self { + Self::Sqlite(buffer) => buffer.push(envelope).await, + Self::InMemory(buffer) => buffer.push(envelope).await, + }?; + relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesWritten) += 1); + Ok(()) + } + + /// Returns a reference to the next-in-line envelope. + pub async fn peek(&mut self) -> Result, EnvelopeBufferError> { + match self { + Self::Sqlite(buffer) => buffer.peek().await, + Self::InMemory(buffer) => buffer.peek().await, + } + } + + /// Pops the next-in-line envelope. + pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { + let envelope = match self { + Self::Sqlite(buffer) => buffer.pop().await, + Self::InMemory(buffer) => buffer.pop().await, + }?; + relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesRead) += 1); + Ok(envelope) + } + + /// Marks a project as ready or not ready. + /// + /// The buffer reprioritizes its envelopes based on this information. + pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { + match self { + Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready), + Self::InMemory(buffer) => buffer.mark_ready(project, is_ready), + } + } +} + +/// Error that occurs while interacting with the envelope buffer. +#[derive(Debug, thiserror::Error)] +pub enum EnvelopeBufferError { + #[error("sqlite")] + Sqlite(#[from] SqliteEnvelopeStackError), + + #[error("impossible")] + Impossible(#[from] Infallible), +} + +/// An envelope buffer that holds an individual stack for each project/sampling project combination. +/// +/// Envelope stacks are organized in a priority queue, and are reprioritized every time an envelope +/// is pushed, popped, or when a project becomes ready. +#[derive(Debug)] +struct EnvelopeBuffer { + /// The central priority queue. + priority_queue: priority_queue::PriorityQueue, Priority>, + /// A lookup table to find all stacks involving a project. + stacks_by_project: hashbrown::HashMap>, + /// A helper to create new stacks. + /// + /// This indirection is needed because different stack implementations might need different + /// initialization (e.g. a database connection). + stack_provider: P, +} + +impl EnvelopeBuffer { + /// Creates an empty buffer. + pub fn new() -> Self { + Self { + stacks_by_project: Default::default(), + priority_queue: Default::default(), + stack_provider: MemoryStackProvider, + } + } +} + +#[allow(dead_code)] +impl EnvelopeBuffer { + /// Creates an empty buffer. + pub async fn new(config: &Config) -> Result { + Ok(Self { + stacks_by_project: Default::default(), + priority_queue: Default::default(), + stack_provider: SqliteStackProvider::new(config).await?, + }) + } +} + +impl EnvelopeBuffer

+where + EnvelopeBufferError: std::convert::From<::Error>, +{ + /// Pushes an envelope to the appropriate envelope stack and reprioritizes the stack. + /// + /// If the envelope stack does not exist, a new stack is pushed to the priority queue. + /// The priority of the stack is updated with the envelope's received_at time. + pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { + let received_at = envelope.meta().start_time(); + let stack_key = StackKey::from_envelope(&envelope); + if let Some(( + QueueItem { + key: _, + value: stack, + }, + _, + )) = self.priority_queue.get_mut(&stack_key) + { + stack.push(envelope).await?; + } else { + self.push_stack(envelope); + } + self.priority_queue.change_priority_by(&stack_key, |prio| { + prio.received_at = received_at; + }); + + Ok(()) + } + + /// Returns a reference to the next-in-line envelope, if one exists. + pub async fn peek(&mut self) -> Result, EnvelopeBufferError> { + let Some(( + QueueItem { + key: _, + value: stack, + }, + _, + )) = self.priority_queue.peek_mut() + else { + return Ok(None); + }; + + Ok(stack.peek().await?) + } + + /// Returns the next-in-line envelope, if one exists. + /// + /// The priority of the envelope's stack is updated with the next envelope's received_at + /// time. If the stack is empty after popping, it is removed from the priority queue. + pub async fn pop(&mut self) -> Result>, EnvelopeBufferError> { + let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else { + return Ok(None); + }; + let stack_key = *key; + let envelope = stack.pop().await.unwrap().expect("found an empty stack"); + + let next_received_at = stack + .peek() + .await? + .map(|next_envelope| next_envelope.meta().start_time()); + match next_received_at { + None => { + self.pop_stack(stack_key); + } + Some(next_received_at) => { + self.priority_queue.change_priority_by(&stack_key, |prio| { + prio.received_at = next_received_at; + }); + } + } + Ok(Some(envelope)) + } + + /// Reprioritizes all stacks that involve the given project key by setting it to "ready". + pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { + let mut changed = false; + if let Some(stack_keys) = self.stacks_by_project.get(project) { + for stack_key in stack_keys { + self.priority_queue.change_priority_by(stack_key, |stack| { + let mut found = false; + for (subkey, readiness) in [ + (stack_key.lesser(), &mut stack.readiness.0), + (stack_key.greater(), &mut stack.readiness.1), + ] { + if subkey == project { + found = true; + if *readiness != is_ready { + changed = true; + *readiness = is_ready; + } + } + } + debug_assert!(found); + }); + } + } + changed + } + + fn push_stack(&mut self, envelope: Box) { + let received_at = envelope.meta().start_time(); + let stack_key = StackKey::from_envelope(&envelope); + let previous_entry = self.priority_queue.push( + QueueItem { + key: stack_key, + value: self.stack_provider.create_stack(envelope), + }, + Priority::new(received_at), + ); + debug_assert!(previous_entry.is_none()); + for project_key in stack_key.iter() { + self.stacks_by_project + .entry(project_key) + .or_default() + .insert(stack_key); + } + relay_statsd::metric!( + gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 + ); + } + + fn pop_stack(&mut self, stack_key: StackKey) { + for project_key in stack_key.iter() { + self.stacks_by_project + .get_mut(&project_key) + .expect("project_key is missing from lookup") + .remove(&stack_key); + } + self.priority_queue.remove(&stack_key); + relay_statsd::metric!( + gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 + ); + } +} + +mod stack_key { + use super::*; + /// Sorted stack key. + /// + /// Contains a pair of project keys. The lower key is always the first + /// element in the pair, such that `(k1, k2)` and `(k2, k1)` map to the same + /// stack key. + #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] + pub struct StackKey(ProjectKey, ProjectKey); + + impl StackKey { + pub fn from_envelope(envelope: &Envelope) -> Self { + let own_key = envelope.meta().public_key(); + let sampling_key = envelope.sampling_key().unwrap_or(own_key); + Self::new(own_key, sampling_key) + } + + pub fn lesser(&self) -> &ProjectKey { + &self.0 + } + + pub fn greater(&self) -> &ProjectKey { + &self.1 + } + + pub fn iter(&self) -> impl Iterator { + std::iter::once(self.0).chain((self.0 != self.1).then_some(self.1)) + } + + fn new(mut key1: ProjectKey, mut key2: ProjectKey) -> Self { + if key2 < key1 { + std::mem::swap(&mut key1, &mut key2); + } + Self(key1, key2) + } + } +} + +#[derive(Debug)] +struct QueueItem { + key: K, + value: V, +} + +impl std::borrow::Borrow for QueueItem { + fn borrow(&self) -> &K { + &self.key + } +} + +impl std::hash::Hash for QueueItem { + fn hash(&self, state: &mut H) { + self.key.hash(state); + } +} + +impl PartialEq for QueueItem { + fn eq(&self, other: &Self) -> bool { + self.key == other.key + } +} + +impl Eq for QueueItem {} + +#[derive(Debug)] +struct Priority { + readiness: Readiness, + received_at: Instant, +} + +impl Priority { + fn new(received_at: Instant) -> Self { + Self { + readiness: Readiness::new(), + received_at, + } + } +} + +impl PartialEq for Priority { + fn eq(&self, other: &Self) -> bool { + self.readiness.ready() == other.readiness.ready() && self.received_at == other.received_at + } +} + +impl PartialOrd for Priority { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Eq for Priority {} + +impl Ord for Priority { + fn cmp(&self, other: &Self) -> Ordering { + match (self.readiness.ready(), other.readiness.ready()) { + (true, true) => self.received_at.cmp(&other.received_at), + (true, false) => Ordering::Greater, + (false, true) => Ordering::Less, + // For non-ready stacks, we invert the priority, such that projects that are not + // ready and did not receive envelopes recently can be evicted. + (false, false) => self.received_at.cmp(&other.received_at).reverse(), + } + } +} + +#[derive(Debug)] +struct Readiness(bool, bool); + +impl Readiness { + fn new() -> Self { + Self(false, false) + } + + fn ready(&self) -> bool { + self.0 && self.1 + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use uuid::Uuid; + + use relay_common::Dsn; + use relay_sampling::DynamicSamplingContext; + + use crate::envelope::{Item, ItemType}; + use crate::extractors::RequestMeta; + + use super::*; + + fn new_envelope(project_key: ProjectKey, sampling_key: Option) -> Box { + let mut envelope = Envelope::from_request( + None, + RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()), + ); + if let Some(sampling_key) = sampling_key { + envelope.set_dsc(DynamicSamplingContext { + public_key: sampling_key, + trace_id: Uuid::new_v4(), + release: None, + user: Default::default(), + replay_id: None, + environment: None, + transaction: None, + sample_rate: None, + sampled: None, + other: Default::default(), + }); + envelope.add_item(Item::new(ItemType::Transaction)); + } + envelope + } + + #[tokio::test] + async fn insert_pop() { + let mut buffer = EnvelopeBuffer::::new(); + + let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); + + assert!(buffer.pop().await.unwrap().is_none()); + assert!(buffer.peek().await.unwrap().is_none()); + + buffer.push(new_envelope(project_key1, None)).await.unwrap(); + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().public_key(), + project_key1 + ); + + buffer.push(new_envelope(project_key2, None)).await.unwrap(); + // Both projects are not ready, so project 1 is on top (has the oldest envelopes): + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().public_key(), + project_key1 + ); + + buffer.push(new_envelope(project_key3, None)).await.unwrap(); + // All projects are not ready, so project 1 is on top (has the oldest envelopes): + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().public_key(), + project_key1 + ); + + // After marking a project ready, it goes to the top: + buffer.mark_ready(&project_key3, true); + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().public_key(), + project_key3 + ); + assert_eq!( + buffer.pop().await.unwrap().unwrap().meta().public_key(), + project_key3 + ); + + // After popping, project 1 is on top again: + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().public_key(), + project_key1 + ); + + // Mark project 1 as ready (still on top): + buffer.mark_ready(&project_key1, true); + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().public_key(), + project_key1 + ); + + // Mark project 2 as ready as well (now on top because most recent): + buffer.mark_ready(&project_key2, true); + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().public_key(), + project_key2 + ); + assert_eq!( + buffer.pop().await.unwrap().unwrap().meta().public_key(), + project_key2 + ); + + // Pop last element: + assert_eq!( + buffer.pop().await.unwrap().unwrap().meta().public_key(), + project_key1 + ); + assert!(buffer.pop().await.unwrap().is_none()); + assert!(buffer.peek().await.unwrap().is_none()); + } + + #[tokio::test] + async fn project_internal_order() { + let mut buffer = EnvelopeBuffer::::new(); + + let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + + let envelope1 = new_envelope(project_key, None); + let instant1 = envelope1.meta().start_time(); + let envelope2 = new_envelope(project_key, None); + let instant2 = envelope2.meta().start_time(); + + assert!(instant2 > instant1); + + buffer.push(envelope1).await.unwrap(); + buffer.push(envelope2).await.unwrap(); + + assert_eq!( + buffer.pop().await.unwrap().unwrap().meta().start_time(), + instant2 + ); + assert_eq!( + buffer.pop().await.unwrap().unwrap().meta().start_time(), + instant1 + ); + assert!(buffer.pop().await.unwrap().is_none()); + } + + #[tokio::test] + async fn sampling_projects() { + let mut buffer = EnvelopeBuffer::::new(); + + let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); + + let envelope1 = new_envelope(project_key1, None); + let instant1 = envelope1.meta().start_time(); + buffer.push(envelope1).await.unwrap(); + + let envelope2 = new_envelope(project_key2, None); + let instant2 = envelope2.meta().start_time(); + buffer.push(envelope2).await.unwrap(); + + let envelope3 = new_envelope(project_key1, Some(project_key2)); + let instant3 = envelope3.meta().start_time(); + buffer.push(envelope3).await.unwrap(); + + // Nothing is ready, instant1 is on top: + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().start_time(), + instant1 + ); + + // Mark project 2 ready, gets on top: + buffer.mark_ready(&project_key2, true); + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().start_time(), + instant2 + ); + + // Revert + buffer.mark_ready(&project_key2, false); + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().start_time(), + instant1 + ); + + // Project 1 ready: + buffer.mark_ready(&project_key1, true); + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().start_time(), + instant1 + ); + + // when both projects are ready, event no 3 ends up on top: + buffer.mark_ready(&project_key2, true); + assert_eq!( + buffer.pop().await.unwrap().unwrap().meta().start_time(), + instant3 + ); + assert_eq!( + buffer.peek().await.unwrap().unwrap().meta().start_time(), + instant2 + ); + + buffer.mark_ready(&project_key2, false); + assert_eq!( + buffer.pop().await.unwrap().unwrap().meta().start_time(), + instant1 + ); + assert_eq!( + buffer.pop().await.unwrap().unwrap().meta().start_time(), + instant2 + ); + + assert!(buffer.pop().await.unwrap().is_none()); + } +} diff --git a/relay-server/src/services/buffer/envelope_stack/memory.rs b/relay-server/src/services/buffer/envelope_stack/memory.rs new file mode 100644 index 0000000000..5e8087010f --- /dev/null +++ b/relay-server/src/services/buffer/envelope_stack/memory.rs @@ -0,0 +1,31 @@ +use std::convert::Infallible; + +use crate::Envelope; + +use super::EnvelopeStack; + +#[derive(Debug)] +pub struct MemoryEnvelopeStack(#[allow(clippy::vec_box)] Vec>); + +impl MemoryEnvelopeStack { + pub fn new(envelope: Box) -> Self { + Self(vec![envelope]) + } +} + +impl EnvelopeStack for MemoryEnvelopeStack { + type Error = Infallible; + + async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { + self.0.push(envelope); + Ok(()) + } + + async fn peek(&mut self) -> Result, Self::Error> { + Ok(self.0.last().map(Box::as_ref)) + } + + async fn pop(&mut self) -> Result>, Self::Error> { + Ok(self.0.pop()) + } +} diff --git a/relay-server/src/services/spooler/envelope_stack/mod.rs b/relay-server/src/services/buffer/envelope_stack/mod.rs similarity index 54% rename from relay-server/src/services/spooler/envelope_stack/mod.rs rename to relay-server/src/services/buffer/envelope_stack/mod.rs index d6e9e0b9bb..ee48016f09 100644 --- a/relay-server/src/services/spooler/envelope_stack/mod.rs +++ b/relay-server/src/services/buffer/envelope_stack/mod.rs @@ -1,27 +1,28 @@ -use crate::envelope::Envelope; use std::future::Future; +use crate::envelope::Envelope; + +pub mod memory; pub mod sqlite; /// A stack-like data structure that holds [`Envelope`]s. -pub trait EnvelopeStack { +pub trait EnvelopeStack: Send + std::fmt::Debug { /// The error type that is returned when an error is encountered during reading or writing the /// [`EnvelopeStack`]. - type Error; + type Error: std::fmt::Debug; /// Pushes an [`Envelope`] on top of the stack. - #[allow(dead_code)] fn push(&mut self, envelope: Box) -> impl Future>; /// Peeks the [`Envelope`] on top of the stack. - /// - /// If the stack is empty, an error is returned. - #[allow(dead_code)] - fn peek(&mut self) -> impl Future, Self::Error>>; + fn peek(&mut self) -> impl Future, Self::Error>>; /// Pops the [`Envelope`] on top of the stack. - /// - /// If the stack is empty, an error is returned. - #[allow(dead_code)] - fn pop(&mut self) -> impl Future, Self::Error>>; + fn pop(&mut self) -> impl Future>, Self::Error>>; +} + +pub trait StackProvider: std::fmt::Debug { + type Stack: EnvelopeStack; + + fn create_stack(&self, envelope: Box) -> Self::Stack; } diff --git a/relay-server/src/services/spooler/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs similarity index 56% rename from relay-server/src/services/spooler/envelope_stack/sqlite.rs rename to relay-server/src/services/buffer/envelope_stack/sqlite.rs index c3fbae8676..9ea8d16a66 100644 --- a/relay-server/src/services/spooler/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -1,36 +1,31 @@ -use crate::envelope::Envelope; -use crate::extractors::StartTime; -use crate::services::spooler::envelope_stack::EnvelopeStack; -use futures::StreamExt; -use relay_base_schema::project::ProjectKey; -use sqlx::query::Query; -use sqlx::sqlite::{SqliteArguments, SqliteRow}; -use sqlx::{Pool, QueryBuilder, Row, Sqlite}; use std::collections::VecDeque; -use std::error::Error; use std::fmt::Debug; use std::num::NonZeroUsize; -use std::pin::pin; -/// An error returned when doing an operation on [`SQLiteEnvelopeStack`]. +use relay_base_schema::project::ProjectKey; + +use crate::envelope::Envelope; +use crate::services::buffer::envelope_stack::EnvelopeStack; +use crate::services::buffer::sqlite_envelope_store::{ + SqliteEnvelopeStore, SqliteEnvelopeStoreError, +}; +use crate::statsd::RelayCounters; + +/// An error returned when doing an operation on [`SqliteEnvelopeStack`]. #[derive(Debug, thiserror::Error)] -pub enum SQLiteEnvelopeStackError { - /// The stack is empty. - #[error("the stack is empty")] - Empty, - - /// The database encountered an unexpected error. - #[error("a database error occurred")] - DatabaseError(#[from] sqlx::Error), +pub enum SqliteEnvelopeStackError { + #[error("an error occurred in the envelope store: {0}")] + EnvelopeStoreError(#[from] SqliteEnvelopeStoreError), } +#[derive(Debug)] /// An [`EnvelopeStack`] that is implemented on an SQLite database. /// /// For efficiency reasons, the implementation has an in-memory buffer that is periodically spooled /// to disk in a batched way. -pub struct SQLiteEnvelopeStack { +pub struct SqliteEnvelopeStack { /// Shared SQLite database pool which will be used to read and write from disk. - db: Pool, + envelope_store: SqliteEnvelopeStore, /// Threshold defining the maximum number of envelopes in the `batches_buffer` before spooling /// to disk will take place. spool_threshold: NonZeroUsize, @@ -50,18 +45,17 @@ pub struct SQLiteEnvelopeStack { check_disk: bool, } -impl SQLiteEnvelopeStack { - /// Creates a new empty [`SQLiteEnvelopeStack`]. - #[allow(dead_code)] +impl SqliteEnvelopeStack { + /// Creates a new empty [`SqliteEnvelopeStack`]. pub fn new( - db: Pool, + envelope_store: SqliteEnvelopeStore, disk_batch_size: usize, max_batches: usize, own_key: ProjectKey, sampling_key: ProjectKey, ) -> Self { Self { - db, + envelope_store, spool_threshold: NonZeroUsize::new(disk_batch_size * max_batches) .expect("the spool threshold must be > 0"), batch_size: NonZeroUsize::new(disk_batch_size) @@ -74,12 +68,12 @@ impl SQLiteEnvelopeStack { } } - /// Threshold above which the [`SQLiteEnvelopeStack`] will spool data from the `buffer` to disk. + /// Threshold above which the [`SqliteEnvelopeStack`] will spool data from the `buffer` to disk. fn above_spool_threshold(&self) -> bool { self.batches_buffer_size >= self.spool_threshold.get() } - /// Threshold below which the [`SQLiteEnvelopeStack`] will unspool data from disk to the + /// Threshold below which the [`SqliteEnvelopeStack`] will unspool data from disk to the /// `buffer`. fn below_unspool_threshold(&self) -> bool { self.batches_buffer_size == 0 @@ -90,36 +84,26 @@ impl SQLiteEnvelopeStack { /// In case there is a failure while writing envelopes, all the envelopes that were enqueued /// to be written to disk are lost. The explanation for this behavior can be found in the body /// of the method. - async fn spool_to_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> { + async fn spool_to_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> { let Some(envelopes) = self.batches_buffer.pop_front() else { return Ok(()); }; self.batches_buffer_size -= envelopes.len(); - let insert_envelopes = envelopes.iter().map(|e| InsertEnvelope { - received_at: received_at(e), - own_key: self.own_key, - sampling_key: self.sampling_key, - encoded_envelope: e.to_vec().unwrap(), - }); + // We convert envelopes into a format which simplifies insertion in the store. If an + // envelope can't be serialized, we will not insert it. + let envelopes = envelopes.iter().filter_map(|e| e.as_ref().try_into().ok()); - // TODO: check how we can do this in a background tokio task in a non-blocking way. - if let Err(err) = build_insert_many_envelopes(insert_envelopes) - .build() - .execute(&self.db) + // When early return here, we are acknowledging that the elements that we popped from + // the buffer are lost in case of failure. We are doing this on purposes, since if we were + // to have a database corruption during runtime, and we were to put the values back into + // the buffer we will end up with an infinite cycle. + self.envelope_store + .insert_many(envelopes) .await - { - relay_log::error!( - error = &err as &dyn Error, - "failed to spool envelopes to disk", - ); + .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?; - // When early return here, we are acknowledging that the elements that we popped from - // the buffer are lost. We are doing this on purposes, since if we were to have a - // database corruption during runtime, and we were to put the values back into the buffer - // we will end up with an infinite cycle. - return Err(SQLiteEnvelopeStackError::DatabaseError(err)); - } + relay_statsd::metric!(counter(RelayCounters::BufferWritesDisk) += 1); // If we successfully spooled to disk, we know that data should be there. self.check_disk = true; @@ -134,98 +118,37 @@ impl SQLiteEnvelopeStack { /// /// In case an envelope fails deserialization due to malformed data in the database, the affected /// envelope will not be unspooled and unspooling will continue with the remaining envelopes. - async fn unspool_from_disk(&mut self) -> Result<(), SQLiteEnvelopeStackError> { - let envelopes = build_delete_and_fetch_many_envelopes( - self.own_key, - self.sampling_key, - self.batch_size.get() as i64, - ) - .fetch(&self.db) - .peekable(); - - let mut envelopes = pin!(envelopes); - if envelopes.as_mut().peek().await.is_none() { - return Ok(()); - } - - // We use a sorted vector to order envelopes that are deleted from the database. - // Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't - // return deleted rows in a specific order. - let mut extracted_envelopes = Vec::with_capacity(self.batch_size.get()); - let mut db_error = None; - while let Some(envelope) = envelopes.as_mut().next().await { - let envelope = match envelope { - Ok(envelope) => envelope, - Err(err) => { - relay_log::error!( - error = &err as &dyn Error, - "failed to unspool the envelopes from the disk", - ); - db_error = Some(err); - - continue; - } - }; - - match self.extract_envelope(envelope) { - Ok(envelope) => { - extracted_envelopes.push(envelope); - } - Err(err) => { - relay_log::error!( - error = &err as &dyn Error, - "failed to extract the envelope unspooled from disk", - ) - } - } - } + async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> { + let envelopes = self + .envelope_store + .delete_many( + self.own_key, + self.sampling_key, + self.batch_size.get() as i64, + ) + .await + .map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?; - if extracted_envelopes.is_empty() { - // If there was a database error and no envelopes have been returned, we assume that we are - // in a critical state, so we return an error. - if let Some(db_error) = db_error { - return Err(SQLiteEnvelopeStackError::DatabaseError(db_error)); - } + relay_statsd::metric!(counter(RelayCounters::BufferReadsDisk) += 1); - // In case no envelopes were unspool, we will mark the disk as empty until another round - // of spooling takes place. + if envelopes.is_empty() { + // In case no envelopes were unspooled, we will mark the disk as empty until another + // round of spooling takes place. self.check_disk = false; return Ok(()); } - // We sort envelopes by `received_at`. - extracted_envelopes.sort_by_key(|a| received_at(a)); - // We push in the back of the buffer, since we still want to give priority to // incoming envelopes that have a more recent timestamp. - self.batches_buffer_size += extracted_envelopes.len(); - self.batches_buffer.push_front(extracted_envelopes); + self.batches_buffer_size += envelopes.len(); + self.batches_buffer.push_front(envelopes); Ok(()) } - /// Deserializes an [`Envelope`] from a database row. - fn extract_envelope(&self, row: SqliteRow) -> Result, SQLiteEnvelopeStackError> { - let envelope_row: Vec = row - .try_get("envelope") - .map_err(|_| SQLiteEnvelopeStackError::Empty)?; - let envelope_bytes = bytes::Bytes::from(envelope_row); - let mut envelope = - Envelope::parse_bytes(envelope_bytes).map_err(|_| SQLiteEnvelopeStackError::Empty)?; - - let received_at: i64 = row - .try_get("received_at") - .map_err(|_| SQLiteEnvelopeStackError::Empty)?; - let start_time = StartTime::from_timestamp_millis(received_at as u64); - - envelope.set_start_time(start_time.into_inner()); - - Ok(envelope) - } - /// Validates that the incoming [`Envelope`] has the same project keys at the - /// [`SQLiteEnvelopeStack`]. + /// [`SqliteEnvelopeStack`]. fn validate_envelope(&self, envelope: &Envelope) -> bool { let own_key = envelope.meta().public_key(); let sampling_key = envelope.sampling_key().unwrap_or(own_key); @@ -234,8 +157,8 @@ impl SQLiteEnvelopeStack { } } -impl EnvelopeStack for SQLiteEnvelopeStack { - type Error = SQLiteEnvelopeStackError; +impl EnvelopeStack for SqliteEnvelopeStack { + type Error = SqliteEnvelopeStackError; async fn push(&mut self, envelope: Box) -> Result<(), Self::Error> { debug_assert!(self.validate_envelope(&envelope)); @@ -263,30 +186,34 @@ impl EnvelopeStack for SQLiteEnvelopeStack { Ok(()) } - async fn peek(&mut self) -> Result<&Box, Self::Error> { + async fn peek(&mut self) -> Result, Self::Error> { if self.below_unspool_threshold() && self.check_disk { self.unspool_from_disk().await? } - self.batches_buffer + let last = self + .batches_buffer .back() .and_then(|last_batch| last_batch.last()) - .ok_or(Self::Error::Empty) + .map(|last_batch| last_batch.as_ref()); + + Ok(last) } - async fn pop(&mut self) -> Result, Self::Error> { + async fn pop(&mut self) -> Result>, Self::Error> { if self.below_unspool_threshold() && self.check_disk { + relay_log::trace!("Unspool from disk"); self.unspool_from_disk().await? } - let result = self - .batches_buffer - .back_mut() - .and_then(|last_batch| { - self.batches_buffer_size -= 1; - last_batch.pop() - }) - .ok_or(Self::Error::Empty); + let result = self.batches_buffer.back_mut().and_then(|last_batch| { + self.batches_buffer_size -= 1; + relay_log::trace!("Popping from memory"); + last_batch.pop() + }); + if result.is_none() { + return Ok(None); + } // Since we might leave a batch without elements, we want to pop it from the buffer. if self @@ -297,81 +224,26 @@ impl EnvelopeStack for SQLiteEnvelopeStack { self.batches_buffer.pop_back(); } - result + Ok(result) } } -/// Struct which contains all the rows that have to be inserted in the database when storing an -/// [`Envelope`]. -struct InsertEnvelope { - received_at: i64, - own_key: ProjectKey, - sampling_key: ProjectKey, - encoded_envelope: Vec, -} - -/// Builds a query that inserts many [`Envelope`]s in the database. -fn build_insert_many_envelopes<'a>( - envelopes: impl Iterator, -) -> QueryBuilder<'a, Sqlite> { - let mut builder: QueryBuilder = - QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) "); - - builder.push_values(envelopes, |mut b, envelope| { - b.push_bind(envelope.received_at) - .push_bind(envelope.own_key.to_string()) - .push_bind(envelope.sampling_key.to_string()) - .push_bind(envelope.encoded_envelope); - }); - - builder -} - -/// Builds a query that deletes many [`Envelope`] from the database. -pub fn build_delete_and_fetch_many_envelopes<'a>( - own_key: ProjectKey, - project_key: ProjectKey, - batch_size: i64, -) -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query( - "DELETE FROM - envelopes - WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? - ORDER BY received_at DESC LIMIT ?) - RETURNING - received_at, own_key, sampling_key, envelope", - ) - .bind(own_key.to_string()) - .bind(project_key.to_string()) - .bind(batch_size) -} - -/// Computes the `received_at` timestamps of an [`Envelope`] based on the `start_time` header. -/// -/// This method has been copied from the `ManagedEnvelope.received_at()` method. -fn received_at(envelope: &Envelope) -> i64 { - relay_common::time::instant_to_date_time(envelope.meta().start_time()).timestamp_millis() -} - #[cfg(test)] mod tests { - use crate::envelope::{Envelope, Item, ItemType}; - use crate::extractors::RequestMeta; - use crate::services::spooler::envelope_stack::sqlite::{ - SQLiteEnvelopeStack, SQLiteEnvelopeStackError, - }; - use crate::services::spooler::envelope_stack::EnvelopeStack; - use relay_base_schema::project::ProjectKey; - use relay_event_schema::protocol::EventId; - use relay_sampling::DynamicSamplingContext; - use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; - use sqlx::{Pool, Sqlite}; use std::collections::BTreeMap; - use std::path::Path; use std::time::{Duration, Instant}; - use tokio::fs::DirBuilder; + use uuid::Uuid; + use relay_base_schema::project::ProjectKey; + use relay_event_schema::protocol::EventId; + use relay_sampling::DynamicSamplingContext; + + use super::*; + use crate::envelope::{Envelope, Item, ItemType}; + use crate::extractors::RequestMeta; + use crate::services::buffer::testutils::utils::setup_db; + fn request_meta() -> RequestMeta { let dsn = "https://a94ae32be2584e0bbd7a4cbb95971fee:@sentry.io/42" .parse() @@ -413,49 +285,13 @@ mod tests { .collect() } - async fn setup_db(run_migrations: bool) -> Pool { - let path = std::env::temp_dir().join(Uuid::new_v4().to_string()); - - create_spool_directory(&path).await; - - let options = SqliteConnectOptions::new() - .filename(&path) - .journal_mode(SqliteJournalMode::Wal) - .create_if_missing(true); - - let db = SqlitePoolOptions::new() - .connect_with(options) - .await - .unwrap(); - - if run_migrations { - sqlx::migrate!("../migrations").run(&db).await.unwrap(); - } - - db - } - - async fn create_spool_directory(path: &Path) { - let Some(parent) = path.parent() else { - return; - }; - - if !parent.as_os_str().is_empty() && !parent.exists() { - relay_log::debug!("creating directory for spooling file: {}", parent.display()); - DirBuilder::new() - .recursive(true) - .create(&parent) - .await - .unwrap(); - } - } - #[tokio::test] #[should_panic] async fn test_push_with_mismatching_project_keys() { let db = setup_db(false).await; - let mut stack = SQLiteEnvelopeStack::new( - db, + let envelope_store = SqliteEnvelopeStore::new(db); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, 2, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -469,8 +305,9 @@ mod tests { #[tokio::test] async fn test_push_when_db_is_not_valid() { let db = setup_db(false).await; - let mut stack = SQLiteEnvelopeStack::new( - db, + let envelope_store = SqliteEnvelopeStore::new(db); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, 2, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -489,7 +326,7 @@ mod tests { let envelope = mock_envelope(Instant::now()); assert!(matches!( stack.push(envelope).await, - Err(SQLiteEnvelopeStackError::DatabaseError(_)) + Err(SqliteEnvelopeStackError::EnvelopeStoreError(_)) )); // The stack now contains the last of the 3 elements that were added. If we add a new one @@ -499,9 +336,9 @@ mod tests { assert_eq!(stack.batches_buffer_size, 3); // We pop the remaining elements, expecting the last added envelope to be on top. - let popped_envelope_1 = stack.pop().await.unwrap(); - let popped_envelope_2 = stack.pop().await.unwrap(); - let popped_envelope_3 = stack.pop().await.unwrap(); + let popped_envelope_1 = stack.pop().await.unwrap().unwrap(); + let popped_envelope_2 = stack.pop().await.unwrap().unwrap(); + let popped_envelope_3 = stack.pop().await.unwrap().unwrap(); assert_eq!( popped_envelope_1.event_id().unwrap(), envelope.event_id().unwrap() @@ -520,8 +357,9 @@ mod tests { #[tokio::test] async fn test_pop_when_db_is_not_valid() { let db = setup_db(false).await; - let mut stack = SQLiteEnvelopeStack::new( - db, + let envelope_store = SqliteEnvelopeStore::new(db); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, 2, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -531,15 +369,16 @@ mod tests { // We pop with an invalid db. assert!(matches!( stack.pop().await, - Err(SQLiteEnvelopeStackError::DatabaseError(_)) + Err(SqliteEnvelopeStackError::EnvelopeStoreError(_)) )); } #[tokio::test] async fn test_pop_when_stack_is_empty() { let db = setup_db(true).await; - let mut stack = SQLiteEnvelopeStack::new( - db, + let envelope_store = SqliteEnvelopeStore::new(db); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, 2, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -547,17 +386,16 @@ mod tests { ); // We pop with no elements. - assert!(matches!( - stack.pop().await, - Err(SQLiteEnvelopeStackError::Empty) - )); + // We pop with no elements. + assert!(stack.pop().await.unwrap().is_none()); } #[tokio::test] async fn test_push_below_threshold_and_pop() { let db = setup_db(true).await; - let mut stack = SQLiteEnvelopeStack::new( - db, + let envelope_store = SqliteEnvelopeStore::new(db); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, 5, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -573,7 +411,7 @@ mod tests { assert_eq!(stack.batches_buffer_size, 5); // We peek the top element. - let peeked_envelope = stack.peek().await.unwrap(); + let peeked_envelope = stack.peek().await.unwrap().unwrap(); assert_eq!( peeked_envelope.event_id().unwrap(), envelopes.clone()[4].event_id().unwrap() @@ -581,7 +419,7 @@ mod tests { // We pop 5 envelopes. for envelope in envelopes.iter().rev() { - let popped_envelope = stack.pop().await.unwrap(); + let popped_envelope = stack.pop().await.unwrap().unwrap(); assert_eq!( popped_envelope.event_id().unwrap(), envelope.event_id().unwrap() @@ -592,8 +430,9 @@ mod tests { #[tokio::test] async fn test_push_above_threshold_and_pop() { let db = setup_db(true).await; - let mut stack = SQLiteEnvelopeStack::new( - db, + let envelope_store = SqliteEnvelopeStore::new(db); + let mut stack = SqliteEnvelopeStack::new( + envelope_store, 5, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -609,7 +448,7 @@ mod tests { assert_eq!(stack.batches_buffer_size, 10); // We peek the top element. - let peeked_envelope = stack.peek().await.unwrap(); + let peeked_envelope = stack.peek().await.unwrap().unwrap(); assert_eq!( peeked_envelope.event_id().unwrap(), envelopes.clone()[14].event_id().unwrap() @@ -618,7 +457,7 @@ mod tests { // We pop 10 envelopes, and we expect that the last 10 are in memory, since the first 5 // should have been spooled to disk. for envelope in envelopes[5..15].iter().rev() { - let popped_envelope = stack.pop().await.unwrap(); + let popped_envelope = stack.pop().await.unwrap().unwrap(); assert_eq!( popped_envelope.event_id().unwrap(), envelope.event_id().unwrap() @@ -627,7 +466,7 @@ mod tests { assert_eq!(stack.batches_buffer_size, 0); // We peek the top element, which since the buffer is empty should result in a disk load. - let peeked_envelope = stack.peek().await.unwrap(); + let peeked_envelope = stack.peek().await.unwrap().unwrap(); assert_eq!( peeked_envelope.event_id().unwrap(), envelopes.clone()[4].event_id().unwrap() @@ -639,7 +478,7 @@ mod tests { assert!(stack.push(envelope.clone()).await.is_ok()); // We pop and expect the newly inserted element. - let popped_envelope = stack.pop().await.unwrap(); + let popped_envelope = stack.pop().await.unwrap().unwrap(); assert_eq!( popped_envelope.event_id().unwrap(), envelope.event_id().unwrap() @@ -648,7 +487,7 @@ mod tests { // We pop 5 envelopes, which should not result in a disk load since `peek()` already should // have caused it. for envelope in envelopes[0..5].iter().rev() { - let popped_envelope = stack.pop().await.unwrap(); + let popped_envelope = stack.pop().await.unwrap().unwrap(); assert_eq!( popped_envelope.event_id().unwrap(), envelope.event_id().unwrap() diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs new file mode 100644 index 0000000000..d5a862136e --- /dev/null +++ b/relay-server/src/services/buffer/mod.rs @@ -0,0 +1,310 @@ +//! Types for buffering envelopes. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use relay_base_schema::project::ProjectKey; +use relay_config::Config; +use tokio::sync::MutexGuard; + +use crate::envelope::Envelope; +use crate::utils::ManagedEnvelope; + +pub use envelope_buffer::EnvelopeBufferError; +pub use envelope_buffer::PolymorphicEnvelopeBuffer; +pub use envelope_stack::sqlite::SqliteEnvelopeStack; // pub for benchmarks +pub use envelope_stack::EnvelopeStack; // pub for benchmarks +pub use sqlite_envelope_store::SqliteEnvelopeStore; // pub for benchmarks // pub for benchmarks + +mod envelope_buffer; +mod envelope_stack; +mod sqlite_envelope_store; +mod stack_provider; +mod testutils; + +/// Async envelope buffering interface. +/// +/// Access to the buffer is synchronized by a tokio lock. +#[derive(Debug)] +pub struct GuardedEnvelopeBuffer { + /// TODO: Reconsider synchronization mechanism. + /// We can either + /// - make the interface sync and use a std Mutex. In this case, we create a queue of threads. + /// - use an async interface with a tokio mutex. In this case, we create a queue of futures. + /// - use message passing (service or channel). In this case, we create a queue of messages. + /// + /// From the tokio docs: + /// + /// > The primary use case for the async mutex is to provide shared mutable access to IO resources such as a database connection. + /// > [...] when you do want shared access to an IO resource, it is often better to spawn a task to manage the IO resource, + /// > and to use message passing to communicate with that task. + inner: tokio::sync::Mutex, + /// Used to notify callers of `peek()` of any changes in the buffer. + notify: tokio::sync::Notify, + + /// Metric that counts how many push operations are waiting. + inflight_push_count: AtomicU64, +} + +impl GuardedEnvelopeBuffer { + /// Creates a memory or disk based [`GuardedEnvelopeBuffer`], depending on the given config. + /// + /// NOTE: until the V1 spooler implementation is removed, this function returns `None` + /// if V2 spooling is not configured. + pub fn from_config(config: &Config) -> Option { + if config.spool_v2() { + Some(Self { + inner: tokio::sync::Mutex::new(Inner { + backend: PolymorphicEnvelopeBuffer::from_config(config), + changed: true, + }), + notify: tokio::sync::Notify::new(), + inflight_push_count: AtomicU64::new(0), + }) + } else { + None + } + } + + /// Schedules a task to push an envelope to the buffer. + /// + /// Once the envelope is pushed, waiters will be notified. + pub fn defer_push(self: Arc, envelope: ManagedEnvelope) { + self.inflight_push_count.fetch_add(1, Ordering::Relaxed); + let this = self.clone(); + tokio::spawn(async move { + if let Err(e) = this.push(envelope.into_envelope()).await { + relay_log::error!( + error = &e as &dyn std::error::Error, + "failed to push envelope" + ); + } + this.inflight_push_count.fetch_sub(1, Ordering::Relaxed); + }); + } + + pub fn inflight_push_count(&self) -> u64 { + self.inflight_push_count.load(Ordering::Relaxed) + } + + /// Returns a reference to the next-in-line envelope. + /// + /// If the buffer is empty or has not changed since the last peek, this function will sleep + /// until something changes in the buffer. + pub async fn peek(&self) -> Peek { + loop { + let mut guard = self.inner.lock().await; + if guard.changed { + match guard.backend.peek().await { + Ok(envelope) => { + if envelope.is_some() { + guard.changed = false; + return Peek { + guard, + notify: &self.notify, + }; + } + } + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to peek envelope" + ); + } + }; + } + drop(guard); // release the lock + self.notify.notified().await; + } + } + + /// Marks a project as ready or not ready. + /// + /// The buffer reprioritizes its envelopes based on this information. + pub async fn mark_ready(&self, project_key: &ProjectKey, ready: bool) { + let mut guard = self.inner.lock().await; + let changed = guard.backend.mark_ready(project_key, ready); + if changed { + self.notify(&mut guard); + } + } + + /// Adds an envelope to the buffer and wakes any waiting consumers. + async fn push(&self, envelope: Box) -> Result<(), EnvelopeBufferError> { + let mut guard = self.inner.lock().await; + guard.backend.push(envelope).await?; + self.notify(&mut guard); + Ok(()) + } + + fn notify(&self, guard: &mut MutexGuard) { + guard.changed = true; + self.notify.notify_waiters(); + } +} + +/// A view onto the next envelope in the buffer. +/// +/// Objects of this type can only exist if the buffer is not empty. +pub struct Peek<'a> { + guard: MutexGuard<'a, Inner>, + notify: &'a tokio::sync::Notify, +} + +impl Peek<'_> { + /// Returns a reference to the next envelope. + pub async fn get(&mut self) -> Result<&Envelope, EnvelopeBufferError> { + Ok(self + .guard + .backend + .peek() + .await? + .expect("element disappeared while holding lock")) + } + + /// Pops the next envelope from the buffer. + /// + /// This functions consumes the [`Peek`]. + pub async fn remove(mut self) -> Result, EnvelopeBufferError> { + self.notify(); + Ok(self + .guard + .backend + .pop() + .await? + .expect("element disappeared while holding lock")) + } + + /// Sync version of [`GuardedEnvelopeBuffer::mark_ready`]. + /// + /// Since [`Peek`] already has exclusive access to the buffer, it can mark projects as ready + /// without awaiting the lock. + pub fn mark_ready(&mut self, project_key: &ProjectKey, ready: bool) { + let changed = self.guard.backend.mark_ready(project_key, ready); + if changed { + self.notify(); + } + } + + fn notify(&mut self) { + self.guard.changed = true; + self.notify.notify_waiters(); + } +} + +#[derive(Debug)] +struct Inner { + backend: PolymorphicEnvelopeBuffer, + /// Used to notify callers of `peek()` of any changes in the buffer. + changed: bool, +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use std::sync::Arc; + use std::time::Duration; + + use relay_common::Dsn; + + use crate::extractors::RequestMeta; + + use super::*; + + fn new_buffer() -> Arc { + GuardedEnvelopeBuffer::from_config( + &Config::from_json_value(serde_json::json!({ + "spool": { + "envelopes": { + "version": "experimental" + } + } + })) + .unwrap(), + ) + .unwrap() + .into() + } + + fn new_envelope() -> Box { + Envelope::from_request( + None, + RequestMeta::new( + Dsn::from_str("http://a94ae32be2584e0bbd7a4cbb95971fed@localhost/1").unwrap(), + ), + ) + } + + #[tokio::test] + async fn no_busy_loop_when_empty() { + let buffer = new_buffer(); + let call_count = Arc::new(AtomicUsize::new(0)); + + tokio::time::pause(); + + let cloned_buffer = buffer.clone(); + let cloned_call_count = call_count.clone(); + tokio::spawn(async move { + loop { + cloned_buffer.peek().await.remove().await.unwrap(); + cloned_call_count.fetch_add(1, Ordering::Relaxed); + } + }); + + // Initial state: no calls + assert_eq!(call_count.load(Ordering::Relaxed), 0); + tokio::time::advance(Duration::from_nanos(1)).await; + assert_eq!(call_count.load(Ordering::Relaxed), 0); + + // State after push: one call + buffer.push(new_envelope()).await.unwrap(); + tokio::time::advance(Duration::from_nanos(1)).await; + assert_eq!(call_count.load(Ordering::Relaxed), 1); + tokio::time::advance(Duration::from_nanos(1)).await; + assert_eq!(call_count.load(Ordering::Relaxed), 1); + + // State after second push: two calls + buffer.push(new_envelope()).await.unwrap(); + tokio::time::advance(Duration::from_nanos(1)).await; + assert_eq!(call_count.load(Ordering::Relaxed), 2); + tokio::time::advance(Duration::from_nanos(1)).await; + assert_eq!(call_count.load(Ordering::Relaxed), 2); + } + + #[tokio::test] + async fn no_busy_loop_when_unchanged() { + let buffer = new_buffer(); + let call_count = Arc::new(AtomicUsize::new(0)); + + tokio::time::pause(); + + let cloned_buffer = buffer.clone(); + let cloned_call_count = call_count.clone(); + tokio::spawn(async move { + loop { + cloned_buffer.peek().await; + cloned_call_count.fetch_add(1, Ordering::Relaxed); + } + }); + + buffer.push(new_envelope()).await.unwrap(); + + // Initial state: no calls + assert_eq!(call_count.load(Ordering::Relaxed), 0); + + // After first advance: got one call + tokio::time::advance(Duration::from_nanos(1)).await; + assert_eq!(call_count.load(Ordering::Relaxed), 1); + + // After second advance: still only one call (no change) + tokio::time::advance(Duration::from_nanos(1)).await; + assert_eq!(call_count.load(Ordering::Relaxed), 1); + + // State after second push: two calls + buffer.push(new_envelope()).await.unwrap(); + tokio::time::advance(Duration::from_nanos(1)).await; + assert_eq!(call_count.load(Ordering::Relaxed), 2); + } +} diff --git a/relay-server/src/services/buffer/sqlite_envelope_store.rs b/relay-server/src/services/buffer/sqlite_envelope_store.rs new file mode 100644 index 0000000000..83616bf55a --- /dev/null +++ b/relay-server/src/services/buffer/sqlite_envelope_store.rs @@ -0,0 +1,516 @@ +use std::error::Error; +use std::path::Path; +use std::pin::pin; + +use futures::stream::StreamExt; +use hashbrown::HashSet; +use relay_base_schema::project::{ParseProjectKeyError, ProjectKey}; +use relay_config::Config; +use sqlx::migrate::MigrateError; +use sqlx::query::Query; +use sqlx::sqlite::{ + SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, + SqliteRow, SqliteSynchronous, +}; +use sqlx::{Pool, QueryBuilder, Row, Sqlite}; +use tokio::fs::DirBuilder; + +use crate::envelope::EnvelopeError; +use crate::extractors::StartTime; +use crate::Envelope; + +/// Struct that contains all the fields of an [`Envelope`] that are mapped to the database columns. +pub struct InsertEnvelope { + received_at: i64, + own_key: ProjectKey, + sampling_key: ProjectKey, + encoded_envelope: Vec, +} + +impl<'a> TryFrom<&'a Envelope> for InsertEnvelope { + type Error = EnvelopeError; + + fn try_from(value: &'a Envelope) -> Result { + let own_key = value.meta().public_key(); + let sampling_key = value.sampling_key().unwrap_or(own_key); + + let encoded_envelope = match value.to_vec() { + Ok(encoded_envelope) => encoded_envelope, + Err(err) => { + relay_log::error!( + error = &err as &dyn Error, + own_key = own_key.to_string(), + sampling_key = sampling_key.to_string(), + "failed to serialize envelope", + ); + + return Err(err); + } + }; + + Ok(InsertEnvelope { + received_at: value.received_at().timestamp_millis(), + own_key, + sampling_key, + encoded_envelope, + }) + } +} + +/// An error returned when doing an operation on [`SqliteEnvelopeStore`]. +#[derive(Debug, thiserror::Error)] +pub enum SqliteEnvelopeStoreError { + #[error("failed to setup the database: {0}")] + SqlxSetupFailed(sqlx::Error), + + #[error("failed to create the spool file: {0}")] + FileSetupError(std::io::Error), + + #[error("failed to write to disk: {0}")] + WriteError(sqlx::Error), + + #[error("failed to read from disk: {0}")] + FetchError(sqlx::Error), + + #[error("no file path for the spool was provided")] + NoFilePath, + + #[error("failed to migrate the database: {0}")] + MigrationError(MigrateError), + + #[error("failed to extract the envelope from the database")] + EnvelopeExtractionError, + + #[error("failed to extract a project key from the database")] + ProjectKeyExtractionError(#[from] ParseProjectKeyError), + + #[error("failed to get database file size: {0}")] + FileSizeReadFailed(sqlx::Error), +} + +/// Struct that offers access to a SQLite-based store of [`Envelope`]s. +/// +/// The goal of this struct is to hide away all the complexity of dealing with the database for +/// reading and writing envelopes. +#[derive(Debug, Clone)] +pub struct SqliteEnvelopeStore { + db: Pool, +} + +impl SqliteEnvelopeStore { + /// Initializes the [`SqliteEnvelopeStore`] with a supplied [`Pool`]. + pub fn new(db: Pool) -> Self { + Self { db } + } + + /// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing + /// the folders where data will be stored. + pub async fn prepare(config: &Config) -> Result { + // If no path is provided, we can't do disk spooling. + let Some(path) = config.spool_envelopes_path() else { + return Err(SqliteEnvelopeStoreError::NoFilePath); + }; + + relay_log::info!("buffer file {}", path.to_string_lossy()); + + Self::setup(&path).await?; + + let options = SqliteConnectOptions::new() + .filename(&path) + // The WAL journaling mode uses a write-ahead log instead of a rollback journal to implement transactions. + // The WAL journaling mode is persistent; after being set it stays in effect + // across multiple database connections and after closing and reopening the database. + // + // 1. WAL is significantly faster in most scenarios. + // 2. WAL provides more concurrency as readers do not block writers and a writer does not block readers. Reading and writing can proceed concurrently. + // 3. Disk I/O operations tends to be more sequential using WAL. + // 4. WAL uses many fewer fsync() operations and is thus less vulnerable to problems on systems where the fsync() system call is broken. + .journal_mode(SqliteJournalMode::Wal) + // WAL mode is safe from corruption with synchronous=NORMAL. + // When synchronous is NORMAL, the SQLite database engine will still sync at the most critical moments, but less often than in FULL mode. + // Which guarantees good balance between safety and speed. + .synchronous(SqliteSynchronous::Normal) + // The freelist pages are moved to the end of the database file and the database file is truncated to remove the freelist pages at every + // transaction commit. Note, however, that auto-vacuum only truncates the freelist pages from the file. + // Auto-vacuum does not defragment the database nor repack individual database pages the way that the VACUUM command does. + // + // This will help us to keep the file size under some control. + .auto_vacuum(SqliteAutoVacuum::Full) + // If shared-cache mode is enabled and a thread establishes multiple + // connections to the same database, the connections share a single data and schema cache. + // This can significantly reduce the quantity of memory and IO required by the system. + .shared_cache(true); + + let db = SqlitePoolOptions::new() + .max_connections(config.spool_envelopes_max_connections()) + .min_connections(config.spool_envelopes_min_connections()) + .connect_with(options) + .await + .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?; + + Ok(SqliteEnvelopeStore { db }) + } + + /// Set up the database and return the current number of envelopes. + /// + /// The directories and spool file will be created if they don't already + /// exist. + async fn setup(path: &Path) -> Result<(), SqliteEnvelopeStoreError> { + Self::create_spool_directory(path).await?; + + let options = SqliteConnectOptions::new() + .filename(path) + .journal_mode(SqliteJournalMode::Wal) + .create_if_missing(true); + + let db = SqlitePoolOptions::new() + .connect_with(options) + .await + .map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?; + + sqlx::migrate!("../migrations") + .run(&db) + .await + .map_err(SqliteEnvelopeStoreError::MigrationError)?; + + Ok(()) + } + + /// Creates the directories for the spool file. + async fn create_spool_directory(path: &Path) -> Result<(), SqliteEnvelopeStoreError> { + let Some(parent) = path.parent() else { + return Ok(()); + }; + + if !parent.as_os_str().is_empty() && !parent.exists() { + relay_log::debug!("creating directory for spooling file: {}", parent.display()); + DirBuilder::new() + .recursive(true) + .create(&parent) + .await + .map_err(SqliteEnvelopeStoreError::FileSetupError)?; + } + + Ok(()) + } + + /// Inserts one or more envelopes into the database. + pub async fn insert_many( + &self, + envelopes: impl IntoIterator, + ) -> Result<(), SqliteEnvelopeStoreError> { + if let Err(err) = build_insert_many_envelopes(envelopes.into_iter()) + .build() + .execute(&self.db) + .await + { + relay_log::error!( + error = &err as &dyn Error, + "failed to spool envelopes to disk", + ); + + return Err(SqliteEnvelopeStoreError::WriteError(err)); + } + + Ok(()) + } + + /// Deletes and returns at most `limit` [`Envelope`]s from the database. + pub async fn delete_many( + &self, + own_key: ProjectKey, + sampling_key: ProjectKey, + limit: i64, + ) -> Result>, SqliteEnvelopeStoreError> { + let envelopes = build_delete_and_fetch_many_envelopes(own_key, sampling_key, limit) + .fetch(&self.db) + .peekable(); + + let mut envelopes = pin!(envelopes); + if envelopes.as_mut().peek().await.is_none() { + return Ok(vec![]); + } + + let mut extracted_envelopes = Vec::with_capacity(limit as usize); + let mut db_error = None; + while let Some(envelope) = envelopes.as_mut().next().await { + let envelope = match envelope { + Ok(envelope) => envelope, + Err(err) => { + relay_log::error!( + error = &err as &dyn Error, + "failed to unspool the envelopes from the disk", + ); + db_error = Some(err); + + continue; + } + }; + + match extract_envelope(envelope) { + Ok(envelope) => { + extracted_envelopes.push(envelope); + } + Err(err) => { + relay_log::error!( + error = &err as &dyn Error, + "failed to extract the envelope unspooled from disk", + ) + } + } + } + + // If we have no envelopes and there was at least one error, we signal total failure to the + // caller. We do this under the assumption that if there are envelopes and failures, we are + // fine with just logging the failure and not failing completely. + if extracted_envelopes.is_empty() { + if let Some(db_error) = db_error { + return Err(SqliteEnvelopeStoreError::FetchError(db_error)); + } + } + + // We sort envelopes by `received_at`. + // Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't + // return deleted rows in a specific order. + extracted_envelopes.sort_by_key(|a| a.received_at()); + + Ok(extracted_envelopes) + } + + /// Returns a set of project key pairs, representing all the unique combinations of + /// `own_key` and `project_key` that are found in the database. + pub async fn project_key_pairs( + &self, + ) -> Result, SqliteEnvelopeStoreError> { + let project_key_pairs = build_get_project_key_pairs() + .fetch_all(&self.db) + .await + .map_err(SqliteEnvelopeStoreError::FetchError)?; + + let project_key_pairs = project_key_pairs + .into_iter() + // Collect only keys we can extract. + .filter_map(|project_key_pair| extract_project_key_pair(project_key_pair).ok()) + .collect(); + + Ok(project_key_pairs) + } + + /// Returns an approximate measure of the size of the database. + pub async fn used_size(&self) -> Result { + build_estimate_size() + .fetch_one(&self.db) + .await + .and_then(|r| r.try_get(0)) + .map_err(SqliteEnvelopeStoreError::FileSizeReadFailed) + } +} + +/// Deserializes an [`Envelope`] from a database row. +fn extract_envelope(row: SqliteRow) -> Result, SqliteEnvelopeStoreError> { + let envelope_row: Vec = row + .try_get("envelope") + .map_err(SqliteEnvelopeStoreError::FetchError)?; + let envelope_bytes = bytes::Bytes::from(envelope_row); + let mut envelope = Envelope::parse_bytes(envelope_bytes) + .map_err(|_| SqliteEnvelopeStoreError::EnvelopeExtractionError)?; + + let received_at: i64 = row + .try_get("received_at") + .map_err(SqliteEnvelopeStoreError::FetchError)?; + let start_time = StartTime::from_timestamp_millis(received_at as u64); + + envelope.set_start_time(start_time.into_inner()); + + Ok(envelope) +} + +/// Deserializes a pair of [`ProjectKey`] from the database. +fn extract_project_key_pair( + row: SqliteRow, +) -> Result<(ProjectKey, ProjectKey), SqliteEnvelopeStoreError> { + let own_key = row + .try_get("own_key") + .map_err(SqliteEnvelopeStoreError::FetchError) + .and_then(|key| { + ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError) + }); + let sampling_key = row + .try_get("sampling_key") + .map_err(SqliteEnvelopeStoreError::FetchError) + .and_then(|key| { + ProjectKey::parse(key).map_err(SqliteEnvelopeStoreError::ProjectKeyExtractionError) + }); + + match (own_key, sampling_key) { + (Ok(own_key), Ok(sampling_key)) => Ok((own_key, sampling_key)), + // Report the first found error. + (Err(err), _) | (_, Err(err)) => { + relay_log::error!("failed to extract a queue key from the spool record: {err}"); + + Err(err) + } + } +} + +/// Builds a query that inserts many [`Envelope`]s in the database. +fn build_insert_many_envelopes<'a>( + envelopes: impl Iterator, +) -> QueryBuilder<'a, Sqlite> { + let mut builder: QueryBuilder = + QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) "); + + builder.push_values(envelopes, |mut b, envelope| { + b.push_bind(envelope.received_at) + .push_bind(envelope.own_key.to_string()) + .push_bind(envelope.sampling_key.to_string()) + .push_bind(envelope.encoded_envelope); + }); + + builder +} + +/// Builds a query that deletes many [`Envelope`] from the database. +pub fn build_delete_and_fetch_many_envelopes<'a>( + own_key: ProjectKey, + project_key: ProjectKey, + batch_size: i64, +) -> Query<'a, Sqlite, SqliteArguments<'a>> { + sqlx::query( + "DELETE FROM + envelopes + WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? + ORDER BY received_at DESC LIMIT ?) + RETURNING + received_at, own_key, sampling_key, envelope", + ) + .bind(own_key.to_string()) + .bind(project_key.to_string()) + .bind(batch_size) +} + +/// Creates a query which fetches the number of used database pages multiplied by the page size. +/// +/// This info used to estimate the current allocated database size. +pub fn build_estimate_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> { + sqlx::query( + r#"SELECT (page_count - freelist_count) * page_size as size FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size();"#, + ) +} + +/// Returns the query to select all the unique combinations of own and sampling keys. +pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> { + sqlx::query("SELECT DISTINCT own_key, sampling_key FROM envelopes;") +} + +#[cfg(test)] +mod tests { + + use hashbrown::HashSet; + use std::collections::BTreeMap; + use std::time::{Duration, Instant}; + use uuid::Uuid; + + use relay_base_schema::project::ProjectKey; + use relay_event_schema::protocol::EventId; + use relay_sampling::DynamicSamplingContext; + + use super::*; + use crate::envelope::{Envelope, Item, ItemType}; + use crate::extractors::RequestMeta; + use crate::services::buffer::testutils::utils::setup_db; + + fn request_meta() -> RequestMeta { + let dsn = "https://a94ae32be2584e0bbd7a4cbb95971fee:@sentry.io/42" + .parse() + .unwrap(); + + RequestMeta::new(dsn) + } + + fn mock_envelope(instant: Instant) -> Box { + let event_id = EventId::new(); + let mut envelope = Envelope::from_request(Some(event_id), request_meta()); + + let dsc = DynamicSamplingContext { + trace_id: Uuid::new_v4(), + public_key: ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + release: Some("1.1.1".to_string()), + user: Default::default(), + replay_id: None, + environment: None, + transaction: Some("transaction1".into()), + sample_rate: None, + sampled: Some(true), + other: BTreeMap::new(), + }; + + envelope.set_dsc(dsc); + envelope.set_start_time(instant); + + envelope.add_item(Item::new(ItemType::Transaction)); + + envelope + } + + #[allow(clippy::vec_box)] + fn mock_envelopes(count: usize) -> Vec> { + let instant = Instant::now(); + (0..count) + .map(|i| mock_envelope(instant - Duration::from_secs((count - i) as u64))) + .collect() + } + + #[tokio::test] + async fn test_insert_and_delete_envelopes() { + let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db); + + let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); + + // We insert 10 envelopes. + let envelopes = mock_envelopes(10); + let envelope_ids: HashSet = + envelopes.iter().filter_map(|e| e.event_id()).collect(); + assert!(envelope_store + .insert_many(envelopes.iter().map(|e| e.as_ref().try_into().unwrap())) + .await + .is_ok()); + + // We check that if we load more than the limit, we still get back at most 10. + let extracted_envelopes = envelope_store + .delete_many(own_key, sampling_key, 15) + .await + .unwrap(); + assert_eq!(envelopes.len(), 10); + for envelope in extracted_envelopes { + assert!(envelope_ids.contains(&envelope.event_id().unwrap())); + } + } + + #[tokio::test] + async fn test_insert_and_get_project_keys_pairs() { + let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db); + + let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(); + + // We insert 10 envelopes. + let envelopes = mock_envelopes(2); + assert!(envelope_store + .insert_many(envelopes.iter().map(|e| e.as_ref().try_into().unwrap())) + .await + .is_ok()); + + // We check that we get back only one pair of project keys, since all envelopes have the + // same pair. + let project_key_pairs = envelope_store.project_key_pairs().await.unwrap(); + assert_eq!(project_key_pairs.len(), 1); + assert_eq!( + project_key_pairs.into_iter().last().unwrap(), + (own_key, sampling_key) + ); + } +} diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs new file mode 100644 index 0000000000..b3fe5c3bb3 --- /dev/null +++ b/relay-server/src/services/buffer/stack_provider/memory.rs @@ -0,0 +1,14 @@ +use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; +use crate::services::buffer::envelope_stack::StackProvider; +use crate::Envelope; + +#[derive(Debug)] +pub struct MemoryStackProvider; + +impl StackProvider for MemoryStackProvider { + type Stack = MemoryEnvelopeStack; + + fn create_stack(&self, envelope: Box) -> Self::Stack { + MemoryEnvelopeStack::new(envelope) + } +} diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs new file mode 100644 index 0000000000..ae663f641d --- /dev/null +++ b/relay-server/src/services/buffer/stack_provider/mod.rs @@ -0,0 +1,2 @@ +pub mod memory; +pub mod sqlite; diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs new file mode 100644 index 0000000000..9716585606 --- /dev/null +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -0,0 +1,44 @@ +use relay_config::Config; + +use crate::services::buffer::envelope_stack::StackProvider; +use crate::services::buffer::sqlite_envelope_store::{ + SqliteEnvelopeStore, SqliteEnvelopeStoreError, +}; +use crate::{Envelope, SqliteEnvelopeStack}; + +#[derive(Debug)] +pub struct SqliteStackProvider { + envelope_store: SqliteEnvelopeStore, + disk_batch_size: usize, + max_batches: usize, +} + +#[warn(dead_code)] +impl SqliteStackProvider { + /// Creates a new [`SqliteStackProvider`] from the provided path to the SQLite database file. + pub async fn new(config: &Config) -> Result { + let envelope_store = SqliteEnvelopeStore::prepare(config).await?; + Ok(Self { + envelope_store, + disk_batch_size: config.spool_envelopes_stack_disk_batch_size(), + max_batches: config.spool_envelopes_stack_max_batches(), + }) + } +} + +impl StackProvider for SqliteStackProvider { + type Stack = SqliteEnvelopeStack; + + fn create_stack(&self, envelope: Box) -> Self::Stack { + let own_key = envelope.meta().public_key(); + let sampling_key = envelope.sampling_key().unwrap_or(own_key); + + SqliteEnvelopeStack::new( + self.envelope_store.clone(), + self.disk_batch_size, + self.max_batches, + own_key, + sampling_key, + ) + } +} diff --git a/relay-server/src/services/buffer/testutils.rs b/relay-server/src/services/buffer/testutils.rs new file mode 100644 index 0000000000..0cb2ed75d6 --- /dev/null +++ b/relay-server/src/services/buffer/testutils.rs @@ -0,0 +1,39 @@ +#[cfg(test)] +pub mod utils { + use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; + use sqlx::{Pool, Sqlite}; + use tokio::fs::DirBuilder; + use uuid::Uuid; + + /// Sets up a temporary SQLite database for testing purposes. + pub async fn setup_db(run_migrations: bool) -> Pool { + let path = std::env::temp_dir().join(Uuid::new_v4().to_string()); + + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() && !parent.exists() { + relay_log::debug!("creating directory for spooling file: {}", parent.display()); + DirBuilder::new() + .recursive(true) + .create(&parent) + .await + .unwrap(); + } + } + + let options = SqliteConnectOptions::new() + .filename(&path) + .journal_mode(SqliteJournalMode::Wal) + .create_if_missing(true); + + let db = SqlitePoolOptions::new() + .connect_with(options) + .await + .unwrap(); + + if run_migrations { + sqlx::migrate!("../migrations").run(&db).await.unwrap(); + } + + db + } +} diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index e76bf78bda..6d916d73f0 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -1,15 +1,15 @@ -use hashbrown::HashMap; use std::time::Duration; +use hashbrown::HashMap; use relay_base_schema::project::ProjectKey; use relay_config::AggregatorServiceConfig; +use relay_metrics::{aggregator, Bucket}; use relay_system::{ AsyncResponse, Controller, FromMessage, Interface, NoResponse, Recipient, Sender, Service, Shutdown, }; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; -use relay_metrics::{aggregator, Bucket}; /// Aggregator for metric buckets. /// diff --git a/relay-server/src/services/metrics/mod.rs b/relay-server/src/services/metrics/mod.rs index cbf5a94af6..5a1ed2cf8f 100644 --- a/relay-server/src/services/metrics/mod.rs +++ b/relay-server/src/services/metrics/mod.rs @@ -1,5 +1,5 @@ -mod aggregator; -mod router; - pub use self::aggregator::*; pub use self::router::*; + +mod aggregator; +mod router; diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index 76b063e752..6b7de838f3 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -3,6 +3,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; + use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig}; use relay_metrics::MetricNamespace; use relay_system::{Addr, NoResponse, Recipient, Service}; diff --git a/relay-server/src/services/mod.rs b/relay-server/src/services/mod.rs index b4291be02e..be7b542b06 100644 --- a/relay-server/src/services/mod.rs +++ b/relay-server/src/services/mod.rs @@ -27,6 +27,7 @@ //! Controller::run(|| Server::start()) //! .expect("failed to start relay"); //! ``` +pub mod buffer; pub mod cogs; pub mod global_config; pub mod health_check; diff --git a/relay-server/src/services/project/state.rs b/relay-server/src/services/project/state.rs index 4788f51e10..9c7ea90acc 100644 --- a/relay-server/src/services/project/state.rs +++ b/relay-server/src/services/project/state.rs @@ -1,12 +1,13 @@ //! Types that represent the current project state. use std::sync::Arc; -mod fetch_state; -mod info; +use serde::{Deserialize, Serialize}; pub use fetch_state::{ExpiryState, ProjectFetchState}; pub use info::{LimitedProjectInfo, ProjectInfo}; -use serde::{Deserialize, Serialize}; + +mod fetch_state; +mod info; /// Representation of a project's current state. #[derive(Clone, Debug)] diff --git a/relay-server/src/services/project/state/fetch_state.rs b/relay-server/src/services/project/state/fetch_state.rs index 3499c9184a..a9cbbdf31a 100644 --- a/relay-server/src/services/project/state/fetch_state.rs +++ b/relay-server/src/services/project/state/fetch_state.rs @@ -1,8 +1,9 @@ use std::sync::Arc; +use tokio::time::Instant; + use relay_config::Config; use relay_dynamic_config::ProjectConfig; -use tokio::time::Instant; use crate::services::project::state::info::ProjectInfo; use crate::services::project::ProjectState; diff --git a/relay-server/src/services/project/state/info.rs b/relay-server/src/services/project/state/info.rs index 0f21d2e015..9e97a73aaf 100644 --- a/relay-server/src/services/project/state/info.rs +++ b/relay-server/src/services/project/state/info.rs @@ -1,4 +1,5 @@ use chrono::{DateTime, Utc}; + use relay_base_schema::project::{ProjectId, ProjectKey}; #[cfg(feature = "processing")] use relay_cardinality::CardinalityLimit; diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 2b28a878ef..8d506063da 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -5,6 +5,7 @@ use std::time::Duration; use crate::extractors::RequestMeta; use crate::metrics::MetricOutcomes; +use crate::services::buffer::{EnvelopeBufferError, GuardedEnvelopeBuffer, Peek}; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::{Config, RelayMode}; @@ -22,7 +23,7 @@ use crate::services::global_config::{self, GlobalConfigManager, Subscribe}; use crate::services::metrics::{Aggregator, FlushBuckets}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::{ - EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProjectMetrics, + EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProcessingGroup, ProjectMetrics, }; use crate::services::project::{ CheckedBuckets, Project, ProjectFetchState, ProjectSender, ProjectState, @@ -565,6 +566,8 @@ impl Services { struct ProjectCacheBroker { config: Arc, memory_checker: MemoryChecker, + // TODO: Make non-optional when spool_v1 is removed. + envelope_buffer: Option>, services: Services, metric_outcomes: MetricOutcomes, // Need hashbrown because extract_if is not stable in std yet. @@ -575,19 +578,25 @@ struct ProjectCacheBroker { source: ProjectSource, /// Tx channel used to send the updated project state whenever requested. state_tx: mpsc::UnboundedSender, + + /// Handle to schedule periodic unspooling of buffered envelopes (spool V1). + spool_v1_unspool_handle: SleepHandle, + spool_v1: Option, + /// Status of the global configuration, used to determine readiness for processing. + global_config: GlobalConfigStatus, +} + +#[derive(Debug)] +struct SpoolV1 { /// Tx channel used by the [`BufferService`] to send back the requested dequeued elements. buffer_tx: mpsc::UnboundedSender, /// Index containing all the [`QueueKey`] that have been enqueued in the [`BufferService`]. index: HashSet, - /// Handle to schedule periodic unspooling of buffered envelopes. - buffer_unspool_handle: SleepHandle, /// Backoff strategy for retrying unspool attempts. buffer_unspool_backoff: RetryBackoff, /// Address of the [`BufferService`] used for enqueuing and dequeuing envelopes that can't be /// immediately processed. buffer: Addr, - /// Status of the global configuration, used to determine readiness for processing. - global_config: GlobalConfigStatus, } /// Describes the current status of the `GlobalConfig`. @@ -614,18 +623,21 @@ impl ProjectCacheBroker { } /// Adds the value to the queue for the provided key. - pub fn enqueue(&mut self, key: QueueKey, value: ManagedEnvelope) { - self.index.insert(key); - self.buffer.send(Enqueue::new(key, value)); + fn enqueue(&mut self, key: QueueKey, value: ManagedEnvelope) { + let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); + spool_v1.index.insert(key); + spool_v1.buffer.send(Enqueue::new(key, value)); } /// Sends the message to the buffer service to dequeue the envelopes. /// /// All the found envelopes will be send back through the `buffer_tx` channel and directly /// forwarded to `handle_processing`. - pub fn dequeue(&self, keys: HashSet) { - self.buffer - .send(DequeueMany::new(keys, self.buffer_tx.clone())) + fn dequeue(&self, keys: HashSet) { + let spool_v1 = self.spool_v1.as_ref().expect("no V1 spool configured"); + spool_v1 + .buffer + .send(DequeueMany::new(keys, spool_v1.buffer_tx.clone())) } /// Evict projects that are over its expiry date. @@ -645,13 +657,15 @@ impl ProjectCacheBroker { // Defer dropping the projects to a dedicated thread: let mut count = 0; for (project_key, project) in expired { - let keys = self - .index - .extract_if(|key| key.own_key == project_key || key.sampling_key == project_key) - .collect::>(); - - if !keys.is_empty() { - self.buffer.send(RemoveMany::new(project_key, keys)) + if let Some(spool_v1) = self.spool_v1.as_mut() { + let keys = spool_v1 + .index + .extract_if(|key| key.own_key == project_key || key.sampling_key == project_key) + .collect::>(); + + if !keys.is_empty() { + spool_v1.buffer.send(RemoveMany::new(project_key, keys)) + } } self.garbage_disposal.dispose(project); @@ -705,6 +719,11 @@ impl ProjectCacheBroker { // Try to schedule unspool if it's not scheduled yet. self.schedule_unspool(); + + // TODO: write test that shows envelope can overtake when project becomes ready. + if let Some(buffer) = self.envelope_buffer.clone() { + tokio::spawn(async move { buffer.mark_ready(&project_key, true).await }); + } } fn handle_request_update(&mut self, message: RequestUpdate) { @@ -762,12 +781,20 @@ impl ProjectCacheBroker { ) -> Result { let CheckEnvelope { envelope: context } = message; let project_cache = self.services.project_cache.clone(); - let project = self.get_or_create_project(context.envelope().meta().public_key()); + let project_key = context.envelope().meta().public_key(); + if let Some(sampling_key) = context.envelope().sampling_key() { + if sampling_key != project_key { + let sampling_project = self.get_or_create_project(sampling_key); + sampling_project.prefetch(project_cache.clone(), false); + } + } + let project = self.get_or_create_project(project_key); // Preload the project cache so that it arrives a little earlier in processing. However, // do not pass `no_cache`. In case the project is rate limited, we do not want to force // a full reload. Fetching must not block the store request. project.prefetch(project_cache, false); + project.check_envelope(context) } @@ -988,11 +1015,15 @@ impl ProjectCacheBroker { } fn handle_buffer_index(&mut self, message: UpdateSpoolIndex) { - self.index.extend(message.0); + let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); + spool_v1.index.extend(message.0); } - fn handle_spool_health(&mut self, sender: Sender) { - self.buffer.send(spooler::Health(sender)) + fn handle_spool_health(&self, sender: Sender) { + match &self.spool_v1 { + Some(spool_v1) => spool_v1.buffer.send(spooler::Health(sender)), + None => sender.send(true), // TODO + } } fn handle_refresh_index_cache(&mut self, message: RefreshIndexCache) { @@ -1000,7 +1031,8 @@ impl ProjectCacheBroker { let project_cache = self.services.project_cache.clone(); for key in index { - self.index.insert(key); + let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); + spool_v1.index.insert(key); self.get_or_create_project(key.own_key) .prefetch(project_cache.clone(), false); if key.own_key != key.sampling_key { @@ -1010,16 +1042,123 @@ impl ProjectCacheBroker { } } + async fn peek_at_envelope(&mut self, mut peek: Peek<'_>) -> Result<(), EnvelopeBufferError> { + let envelope = peek.get().await?; + if envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age() { + let popped_envelope = peek.remove().await?; + let mut managed_envelope = ManagedEnvelope::new( + popped_envelope, + self.services.outcome_aggregator.clone(), + self.services.test_store.clone(), + ProcessingGroup::Ungrouped, + ); + managed_envelope.reject(Outcome::Invalid(DiscardReason::Timestamp)); + // TODO: metrics in all branches. + return Ok(()); + } + let sampling_key = envelope.sampling_key(); + let services = self.services.clone(); + + let own_key = envelope.meta().public_key(); + let project = self.get_or_create_project(own_key); + let project_state = project.get_cached_state(services.project_cache.clone(), false); + + // Check if project config is enabled. + let project_info = match project_state { + ProjectState::Enabled(info) => { + peek.mark_ready(&own_key, true); + info + } + ProjectState::Disabled => { + let popped_envelope = peek.remove().await?; + let mut managed_envelope = ManagedEnvelope::new( + popped_envelope, + self.services.outcome_aggregator.clone(), + self.services.test_store.clone(), + ProcessingGroup::Ungrouped, + ); + managed_envelope.reject(Outcome::Invalid(DiscardReason::ProjectId)); + return Ok(()); + } + ProjectState::Pending => { + peek.mark_ready(&own_key, false); + return Ok(()); + } + }; + + // Check if sampling config is enabled. + let sampling_project_info = match sampling_key.map(|sampling_key| { + ( + sampling_key, + self.get_or_create_project(sampling_key) + .get_cached_state(services.project_cache, false), + ) + }) { + Some((sampling_key, ProjectState::Enabled(info))) => { + peek.mark_ready(&sampling_key, true); + // Only set if it matches the organization ID. Otherwise treat as if there is + // no sampling project. + (info.organization_id == project_info.organization_id).then_some(info) + } + Some((_, ProjectState::Disabled)) => { + // Accept envelope even if its sampling state is disabled: + None + } + Some((sampling_key, ProjectState::Pending)) => { + peek.mark_ready(&sampling_key, false); + return Ok(()); + } + None => None, + }; + + let project = self.get_or_create_project(own_key); + + // Reassign processing groups and proceed to processing. + let popped_envelope = peek.remove().await?; + for (group, envelope) in ProcessingGroup::split_envelope(*popped_envelope) { + let managed_envelope = ManagedEnvelope::new( + envelope, + services.outcome_aggregator.clone(), + services.test_store.clone(), + group, + ); + + let Ok(CheckedEnvelope { + envelope: Some(managed_envelope), + .. + }) = project.check_envelope(managed_envelope) + else { + continue; // Outcomes are emitted by check_envelope + }; + + let reservoir_counters = project.reservoir_counters(); + services.envelope_processor.send(ProcessEnvelope { + envelope: managed_envelope, + project_info: project_info.clone(), + sampling_project_info: sampling_project_info.clone(), + reservoir_counters, + }); + } + + Ok(()) + } + /// Returns backoff timeout for an unspool attempt. fn next_unspool_attempt(&mut self) -> Duration { - self.config.spool_envelopes_unspool_interval() + self.buffer_unspool_backoff.next_backoff() + let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); + self.config.spool_envelopes_unspool_interval() + + spool_v1.buffer_unspool_backoff.next_backoff() } fn schedule_unspool(&mut self) { - if self.buffer_unspool_handle.is_idle() { + if self.spool_v1.is_none() { + return; + } + + if self.spool_v1_unspool_handle.is_idle() { // Set the time for the next attempt. let wait = self.next_unspool_attempt(); - self.buffer_unspool_handle.set(wait); + self.spool_v1_unspool_handle.set(wait); } } @@ -1045,6 +1184,7 @@ impl ProjectCacheBroker { /// This makes sure we always moving the unspool forward, even if we do not fetch the project /// states updates, but still can process data based on the existing cache. fn handle_periodic_unspool(&mut self) { + relay_log::trace!("handle_periodic_unspool"); let (num_keys, reason) = self.handle_periodic_unspool_inner(); relay_statsd::metric!( gauge(RelayGauges::BufferPeriodicUnspool) = num_keys as u64, @@ -1053,21 +1193,22 @@ impl ProjectCacheBroker { } fn handle_periodic_unspool_inner(&mut self) -> (usize, &str) { - self.buffer_unspool_handle.reset(); + let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); + self.spool_v1_unspool_handle.reset(); // If we don't yet have the global config, we will defer dequeuing until we do. if let GlobalConfigStatus::Pending = self.global_config { - self.buffer_unspool_backoff.reset(); + spool_v1.buffer_unspool_backoff.reset(); self.schedule_unspool(); return (0, "no_global_config"); } // If there is nothing spooled, schedule the next check a little bit later. - if self.index.is_empty() { + if spool_v1.index.is_empty() { self.schedule_unspool(); return (0, "index_empty"); } - let mut index = std::mem::take(&mut self.index); + let mut index = std::mem::take(&mut spool_v1.index); let keys = index .extract_if(|key| self.is_state_cached(key)) .take(BATCH_KEY_COUNT) @@ -1079,12 +1220,13 @@ impl ProjectCacheBroker { } // Return all the un-used items to the index. + let spool_v1 = self.spool_v1.as_mut().expect("no V1 spool configured"); if !index.is_empty() { - self.index.extend(index); + spool_v1.index.extend(index); } // Schedule unspool once we are done. - self.buffer_unspool_backoff.reset(); + spool_v1.buffer_unspool_backoff.reset(); self.schedule_unspool(); (num_keys, "found_keys") @@ -1130,6 +1272,7 @@ impl ProjectCacheBroker { pub struct ProjectCacheService { config: Arc, memory_checker: MemoryChecker, + envelope_buffer: Option>, services: Services, metric_outcomes: MetricOutcomes, redis: Option, @@ -1140,6 +1283,7 @@ impl ProjectCacheService { pub fn new( config: Arc, memory_checker: MemoryChecker, + envelope_buffer: Option>, services: Services, metric_outcomes: MetricOutcomes, redis: Option, @@ -1147,6 +1291,7 @@ impl ProjectCacheService { Self { config, memory_checker, + envelope_buffer, services, metric_outcomes, redis, @@ -1161,6 +1306,7 @@ impl Service for ProjectCacheService { let Self { config, memory_checker, + envelope_buffer, services, metric_outcomes, redis, @@ -1171,34 +1317,12 @@ impl Service for ProjectCacheService { tokio::spawn(async move { let mut ticker = tokio::time::interval(config.cache_eviction_interval()); + let mut report_ticker = tokio::time::interval(Duration::from_secs(1)); relay_log::info!("project cache started"); // Channel for async project state responses back into the project cache. let (state_tx, mut state_rx) = mpsc::unbounded_channel(); - // Channel for envelope buffering. - let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); - let buffer_services = spooler::Services { - outcome_aggregator, - project_cache, - test_store, - }; - let buffer = match BufferService::create( - memory_checker.clone(), - buffer_services, - config.clone(), - ) - .await - { - Ok(buffer) => buffer.start(), - Err(err) => { - relay_log::error!(error = &err as &dyn Error, "failed to start buffer service",); - // NOTE: The process will exit with error if the buffer file could not be - // opened or the migrations could not be run. - std::process::exit(1); - } - }; - let Ok(mut subscription) = services.global_config.send(Subscribe).await else { // TODO(iker): we accept this sub-optimal error handling. TBD // the approach to deal with failures on the subscription @@ -1218,14 +1342,53 @@ impl Service for ProjectCacheService { } }; - // Request the existing index from the spooler. - buffer.send(RestoreIndex); + let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel(); + let spool_v1 = match config.spool_v2() { + true => None, + false => Some({ + // Channel for envelope buffering. + let buffer_services = spooler::Services { + outcome_aggregator, + project_cache, + test_store, + }; + let buffer = match BufferService::create( + memory_checker.clone(), + buffer_services, + config.clone(), + ) + .await + { + Ok(buffer) => buffer.start(), + Err(err) => { + relay_log::error!( + error = &err as &dyn Error, + "failed to start buffer service", + ); + // NOTE: The process will exit with error if the buffer file could not be + // opened or the migrations could not be run. + std::process::exit(1); + } + }; + + // Request the existing index from the spooler. + buffer.send(RestoreIndex); + + SpoolV1 { + buffer_tx, + index: HashSet::new(), + buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()), + buffer, + } + }), + }; // Main broker that serializes public and internal messages, and triggers project state // fetches via the project source. let mut broker = ProjectCacheBroker { config: config.clone(), memory_checker, + envelope_buffer: envelope_buffer.clone(), projects: hashbrown::HashMap::new(), garbage_disposal: GarbageDisposal::new(), source: ProjectSource::start( @@ -1235,11 +1398,8 @@ impl Service for ProjectCacheService { ), services, state_tx, - buffer_tx, - index: HashSet::new(), - buffer_unspool_handle: SleepHandle::idle(), - buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()), - buffer, + spool_v1_unspool_handle: SleepHandle::idle(), + spool_v1, global_config, metric_outcomes, }; @@ -1275,7 +1435,7 @@ impl Service for ProjectCacheService { broker.evict_stale_project_caches() }) } - () = &mut broker.buffer_unspool_handle => { + () = &mut broker.spool_v1_unspool_handle => { metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "periodic_unspool", { broker.handle_periodic_unspool() }) @@ -1285,6 +1445,18 @@ impl Service for ProjectCacheService { broker.handle_message(message) }) } + peek = peek_buffer(&envelope_buffer) => { + metric!(timer(RelayTimers::ProjectCacheTaskDuration), task = "peek_at_envelope", { + if let Err(e) = broker.peek_at_envelope(peek).await { + relay_log::error!(error = &e as &dyn std::error::Error, "Failed to peek envelope"); + } + }) + } + _ = report_ticker.tick() => { + if let Some(envelope_buffer) = &envelope_buffer { + relay_statsd::metric!(gauge(RelayGauges::BufferPushInFlight) = envelope_buffer.inflight_push_count()); + } + } else => break, } } @@ -1294,6 +1466,14 @@ impl Service for ProjectCacheService { } } +/// Temporary helper function while V1 spool eixsts. +async fn peek_buffer(buffer: &Option>) -> Peek { + match buffer { + Some(buffer) => buffer.peek().await, + None => std::future::pending().await, + } +} + #[derive(Clone, Debug)] pub struct FetchProjectState { /// The public key to fetch the project by. @@ -1368,6 +1548,7 @@ mod tests { .unwrap() .into(); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); + let envelope_buffer = GuardedEnvelopeBuffer::from_config(&config).map(Arc::new); let buffer_services = spooler::Services { outcome_aggregator: services.outcome_aggregator.clone(), project_cache: services.project_cache.clone(), @@ -1398,17 +1579,20 @@ mod tests { ProjectCacheBroker { config: config.clone(), memory_checker, + envelope_buffer, projects: hashbrown::HashMap::new(), garbage_disposal: GarbageDisposal::new(), source: ProjectSource::start(config, services.upstream_relay.clone(), None), services, state_tx, - buffer_tx, - index: HashSet::new(), - buffer: buffer.clone(), + spool_v1_unspool_handle: SleepHandle::idle(), + spool_v1: Some(SpoolV1 { + buffer_tx, + index: HashSet::new(), + buffer: buffer.clone(), + buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)), + }), global_config: GlobalConfigStatus::Pending, - buffer_unspool_handle: SleepHandle::idle(), - buffer_unspool_backoff: RetryBackoff::new(Duration::from_millis(100)), metric_outcomes, }, buffer, @@ -1455,10 +1639,10 @@ mod tests { select! { Some(assert) = rx_assert.recv() => { - assert_eq!(broker.index.len(), assert); + assert_eq!(broker.spool_v1.as_ref().unwrap().index.len(), assert); }, Some(update) = rx_update.recv() => broker.merge_state(update), - () = &mut broker.buffer_unspool_handle => broker.handle_periodic_unspool(), + () = &mut broker.spool_v1_unspool_handle => broker.handle_periodic_unspool(), } } }); @@ -1517,7 +1701,7 @@ mod tests { // Index and projects are empty. assert!(broker.projects.is_empty()); - assert!(broker.index.is_empty()); + assert!(broker.spool_v1.as_mut().unwrap().index.is_empty()); // Since there is no project we should not process anything but create a project and spool // the envelope. @@ -1525,7 +1709,7 @@ mod tests { // Assert that we have a new project and also added an index. assert!(broker.projects.get(&project_key).is_some()); - assert!(broker.index.contains(&key)); + assert!(broker.spool_v1.as_mut().unwrap().index.contains(&key)); // Check is we actually spooled anything. buffer_svc.send(DequeueMany::new([key].into(), buffer_tx.clone())); diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 13e86421d0..00265b0e92 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -60,7 +60,6 @@ use crate::services::test_store::TestStore; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; use crate::utils::{ManagedEnvelope, MemoryChecker}; -pub mod envelope_stack; pub mod spool_utils; mod sql; @@ -564,7 +563,7 @@ impl OnDisk { .fetch(&self.db) .peekable(); let mut envelopes = pin!(envelopes); - relay_statsd::metric!(counter(RelayCounters::BufferReads) += 1); + relay_statsd::metric!(counter(RelayCounters::BufferReadsDisk) += 1); // Stream is empty, we can break the loop, since we read everything by now. if envelopes.as_mut().peek().await.is_none() { @@ -631,7 +630,7 @@ impl OnDisk { .fetch(&self.db) .peekable(); let mut envelopes = pin!(envelopes); - relay_statsd::metric!(counter(RelayCounters::BufferReads) += 1); + relay_statsd::metric!(counter(RelayCounters::BufferReadsDisk) += 1); // Stream is empty, we can break the loop, since we read everything by now. if envelopes.as_mut().peek().await.is_none() { break; @@ -755,7 +754,7 @@ impl OnDisk { .map_err(BufferError::InsertFailed)?; self.track_count(1); - relay_statsd::metric!(counter(RelayCounters::BufferWrites) += 1); + relay_statsd::metric!(counter(RelayCounters::BufferWritesDisk) += 1); Ok(()) } diff --git a/relay-server/src/services/spooler/sql.rs b/relay-server/src/services/spooler/sql.rs index aab3450d4e..02d0b66d3c 100644 --- a/relay-server/src/services/spooler/sql.rs +++ b/relay-server/src/services/spooler/sql.rs @@ -148,7 +148,7 @@ pub async fn do_insert( while let Some(chunk) = envelopes.next().await { let result = build_insert(&mut query_builder, chunk).execute(db).await?; count += result.rows_affected(); - relay_statsd::metric!(counter(RelayCounters::BufferWrites) += 1); + relay_statsd::metric!(counter(RelayCounters::BufferWritesDisk) += 1); // Reset the builder to initial state set by `QueryBuilder::new` function, // so it can be reused for another chunk. diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 0629ac9d47..489d01d84d 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -26,6 +26,14 @@ pub enum RelayGauges { /// This metric is tagged with: /// - `reason`: Why keys are / are not unspooled. BufferPeriodicUnspool, + /// Number of envelopes currently waiting to be buffered. + /// + /// This corresponds to the number of corresponding tokio tasks currently scheduled or running. + BufferPushInFlight, + /// The number of individual stacks in the priority queue. + /// + /// Per combination of `(own_key, sampling_key)`, a new stack is created. + BufferStackCount, /// The currently used memory by the entire system. /// /// Relay uses the same value for its memory health check. @@ -50,6 +58,8 @@ impl GaugeMetric for RelayGauges { RelayGauges::BufferEnvelopesMemoryCount => "buffer.envelopes_mem_count", RelayGauges::BufferEnvelopesDiskCount => "buffer.envelopes_disk_count", RelayGauges::BufferPeriodicUnspool => "buffer.unspool.periodic", + RelayGauges::BufferPushInFlight => "buffer.push_inflight", + RelayGauges::BufferStackCount => "buffer.stack_count", RelayGauges::SystemMemoryUsed => "health.system_memory.used", RelayGauges::SystemMemoryTotal => "health.system_memory.total", #[cfg(feature = "processing")] @@ -576,13 +586,13 @@ pub enum RelayCounters { /// - `handling`: Either `"success"` if the envelope was handled correctly, or `"failure"` if /// there was an error or bug. EnvelopeRejected, - /// Number times the envelope buffer spools to disk. - BufferWrites, - /// Number times the envelope buffer reads back from disk. - BufferReads, - /// Number of _envelopes_ the envelope buffer spools to disk. + /// Number of times the envelope buffer spools to disk. + BufferWritesDisk, + /// Number of times the envelope buffer reads back from disk. + BufferReadsDisk, + /// Number of _envelopes_ the envelope buffer ingests. BufferEnvelopesWritten, - /// Number of _envelopes_ the envelope buffer reads back from disk. + /// Number of _envelopes_ the envelope buffer produces. BufferEnvelopesRead, /// Number of state changes in the envelope buffer. /// This metric is tagged with: @@ -794,8 +804,8 @@ impl CounterMetric for RelayCounters { RelayCounters::EventCorrupted => "event.corrupted", RelayCounters::EnvelopeAccepted => "event.accepted", RelayCounters::EnvelopeRejected => "event.rejected", - RelayCounters::BufferWrites => "buffer.writes", - RelayCounters::BufferReads => "buffer.reads", + RelayCounters::BufferWritesDisk => "buffer.writes", + RelayCounters::BufferReadsDisk => "buffer.reads", RelayCounters::BufferEnvelopesWritten => "buffer.envelopes_written", RelayCounters::BufferEnvelopesRead => "buffer.envelopes_read", RelayCounters::BufferStateTransition => "buffer.state.transition", diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index f737f27a81..ba52f7c02e 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -225,7 +225,7 @@ impl ManagedEnvelope { /// Consumes itself returning the managed envelope. pub fn into_envelope(mut self) -> Box { self.context.done = true; - Box::new(self.envelope.take_items()) + self.take_envelope() } /// Converts current managed envelope into processed envelope. @@ -503,7 +503,7 @@ impl ManagedEnvelope { /// /// This is the date time equivalent to [`start_time`](Self::start_time). pub fn received_at(&self) -> DateTime { - relay_common::time::instant_to_date_time(self.envelope().meta().start_time()) + self.envelope.received_at() } /// Resets inner state to ensure there's no more logging. diff --git a/tests/integration/fixtures/relay.py b/tests/integration/fixtures/relay.py index 28f21d45e6..fc55f0d031 100644 --- a/tests/integration/fixtures/relay.py +++ b/tests/integration/fixtures/relay.py @@ -148,7 +148,7 @@ def inner( }, "spool": { # Unspool as quickly as possible - "envelopes": {"unspool_interval": 1}, + "envelopes": {"unspool_interval": 1, "version": "experimental"}, }, } diff --git a/tests/integration/test_projectconfigs.py b/tests/integration/test_projectconfigs.py index baf2a03466..0c4f6735e4 100644 --- a/tests/integration/test_projectconfigs.py +++ b/tests/integration/test_projectconfigs.py @@ -6,8 +6,6 @@ import pytest import time from collections import namedtuple -import tempfile -import os from sentry_relay.auth import PublicKey, SecretKey, generate_key_pair @@ -231,11 +229,7 @@ def get_response(relay, packed, signature, version="3"): return data -@pytest.mark.parametrize( - "buffer_config", - [False, True], -) -def test_unparsable_project_config(buffer_config, mini_sentry, relay): +def test_unparsable_project_config(mini_sentry, relay): project_key = 42 relay_config = { "cache": { @@ -249,12 +243,6 @@ def test_unparsable_project_config(buffer_config, mini_sentry, relay): }, } - if buffer_config: - temp = tempfile.mkdtemp() - dbfile = os.path.join(temp, "buffer.db") - # set the buffer to something low to force the spooling - relay_config["spool"] = {"envelopes": {"path": dbfile, "max_memory_size": 1000}} - relay = relay(mini_sentry, relay_config) mini_sentry.add_full_project_config(project_key) public_key = mini_sentry.get_dsn_public_key(project_key) diff --git a/tests/integration/test_query.py b/tests/integration/test_query.py index 1783cc64e8..d9bcc14556 100644 --- a/tests/integration/test_query.py +++ b/tests/integration/test_query.py @@ -124,7 +124,7 @@ def get_project_config(): retry_count += 1 print("RETRY", retry_count) - if retry_count < 2: + if retry_count < 3: if failure_type == "timeout": time.sleep(50) # ensure timeout elif failure_type == "socketerror":