diff --git a/CHANGELOG.md b/CHANGELOG.md index c7e35126c1..066ad9daa9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +**Internal**: + +- Proactively move on-disk spool to memory. ([#2949](https://github.com/getsentry/relay/pull/2949)) + **Bug Fixes**: - Add automatic PII scrubbing to `logentry.params`. ([#2956](https://github.com/getsentry/relay/pull/2956)) diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 490de9bf0c..d69f3a54a2 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -1148,6 +1148,8 @@ mod tests { #[tokio::test] async fn always_spools() { + relay_log::init_test!(); + let num_permits = 5; let buffer_guard: Arc<_> = BufferGuard::new(num_permits).into(); let services = mocked_services(); @@ -1194,7 +1196,7 @@ mod tests { )); tokio::time::sleep(Duration::from_millis(100)).await; - // We should be able to unspool once since we have 1 permit. + // We should be able to unspool 5 envelopes since we have 5 permits. let mut envelopes = vec![]; while let Ok(envelope) = rx.try_recv() { envelopes.push(envelope) diff --git a/relay-server/src/actors/spooler/mod.rs b/relay-server/src/actors/spooler/mod.rs index 8e9edead23..44c0f5b991 100644 --- a/relay-server/src/actors/spooler/mod.rs +++ b/relay-server/src/actors/spooler/mod.rs @@ -32,11 +32,11 @@ use std::collections::{BTreeMap, BTreeSet}; use std::error::Error; use std::path::Path; -use std::pin::Pin; +use std::pin::pin; use std::sync::Arc; use futures::stream::{self, StreamExt}; -use relay_base_schema::project::ProjectKey; +use relay_base_schema::project::{ParseProjectKeyError, ProjectKey}; use relay_config::Config; use relay_system::{Addr, Controller, FromMessage, Interface, Sender, Service}; use sqlx::migrate::MigrateError; @@ -59,7 +59,16 @@ use crate::utils::{BufferGuard, ManagedEnvelope}; mod sql; -/// The set of errors which can happend while working the the buffer. +/// The predefined batch size for the SQL queries, when fetching anything from the on-disk spool. +const BATCH_SIZE: i64 = 200; + +/// The low memory watermark for spool. +/// +/// This number is used to calculate how much memory should be taken by the on-disk spool if all +/// the spooled envelopes will be moved to in-memory buffer. +const LOW_SPOOL_MEMORY_WATERMARK: f64 = 0.3; + +/// The set of errors which can happened while working the buffer. #[derive(Debug, thiserror::Error)] pub enum BufferError { #[error("failed to move envelope from disk to memory")] @@ -92,6 +101,9 @@ pub enum BufferError { #[error("failed to run migrations")] MigrationFailed(#[from] MigrateError), + #[error("failed to extract project key from the row")] + ParseProjectKeyFailed(#[from] ParseProjectKeyError), + #[error("on-disk spool is full")] SpoolIsFull, } @@ -248,6 +260,29 @@ impl InMemory { } } + /// Creates a new [`InMemory`] state using already provided buffer of the envelopes. + fn new_with_buffer( + max_memory_size: usize, + buffer: BTreeMap>, + ) -> Self { + let (envelope_count, used_memory) = + buffer + .values() + .fold((0, 0), |(envelope_count, used_memory), envelopes| { + ( + envelope_count + envelopes.len(), + used_memory + envelopes.iter().map(|e| e.estimated_size()).sum::(), + ) + }); + + Self { + max_memory_size, + buffer, + used_memory, + envelope_count, + } + } + /// Returns the number of envelopes in the memory buffer. fn count(&self) -> usize { self.buffer.values().map(|v| v.len()).sum() @@ -387,7 +422,7 @@ impl OnDisk { &self, row: SqliteRow, services: &Services, - ) -> Result, BufferError> { + ) -> Result<(QueueKey, Vec), BufferError> { let envelope_row: Vec = row.try_get("envelope").map_err(BufferError::FetchFailed)?; let envelope_bytes = bytes::Bytes::from(envelope_row); let mut envelope = Envelope::parse_bytes(envelope_bytes)?; @@ -396,10 +431,19 @@ impl OnDisk { .try_get("received_at") .map_err(BufferError::FetchFailed)?; let start_time = StartTime::from_timestamp_millis(received_at as u64); + let own_key: &str = row.try_get("own_key").map_err(BufferError::FetchFailed)?; + let sampling_key: &str = row + .try_get("sampling_key") + .map_err(BufferError::FetchFailed)?; + let queue_key = QueueKey { + own_key: ProjectKey::parse(own_key).map_err(BufferError::ParseProjectKeyFailed)?, + sampling_key: ProjectKey::parse(sampling_key) + .map_err(BufferError::ParseProjectKeyFailed)?, + }; envelope.set_start_time(start_time.into_inner()); - ProcessingGroup::split_envelope(*envelope) + let envelopes: Result, BufferError> = ProcessingGroup::split_envelope(*envelope) .into_iter() .map(|(group, envelope)| { let managed_envelope = self.buffer_guard.enter( @@ -410,7 +454,13 @@ impl OnDisk { )?; Ok(managed_envelope) }) - .collect() + .collect(); + Ok((queue_key, envelopes?)) + } + + /// Returns the size of the batch to unspool. + fn unspool_batch(&self) -> i64 { + BATCH_SIZE.min(self.buffer_guard.available() as i64) } /// Tries to delete the envelopes from the persistent buffer in batches, @@ -443,21 +493,19 @@ impl OnDisk { // 2. Make sure that if we panic and deleted envelopes cannot be read out fully, we do not lose all of them, // but only one batch, and the rest of them will stay on disk for the next iteration // to pick up. - // - // Right now we use 100 for batch size. - let batch_size = 100; - let mut envelopes = sql::delete_and_fetch(key, batch_size) + let envelopes = sql::delete_and_fetch(key, self.unspool_batch()) .fetch(&self.db) .peekable(); + let mut envelopes = pin!(envelopes); relay_statsd::metric!(counter(RelayCounters::BufferReads) += 1); // Stream is empty, we can break the loop, since we read everything by now. - if Pin::new(&mut envelopes).peek().await.is_none() { + if envelopes.as_mut().peek().await.is_none() { return Ok(()); } let mut count: i64 = 0; - while let Some(envelope) = envelopes.next().await { + while let Some(envelope) = envelopes.as_mut().next().await { count += 1; let envelope = match envelope { Ok(envelope) => envelope, @@ -474,7 +522,7 @@ impl OnDisk { }; match self.extract_envelope(envelope, services) { - Ok(managed_envelopes) => { + Ok((_, managed_envelopes)) => { for managed_envelope in managed_envelopes { sender.send(managed_envelope).ok(); } @@ -490,6 +538,63 @@ impl OnDisk { } } + /// Unspools the entire contents of the on-disk spool. + async fn delete_and_fetch_all( + &mut self, + services: &Services, + ) -> Result>, BufferError> { + let mut result: BTreeMap> = BTreeMap::new(); + + loop { + // On each iteration make sure we are still below the lower limit of available + // guard permits. + if !self.buffer_guard.is_below_low_watermark() { + return Ok(result); + } + let envelopes = sql::delete_and_fetch_all(self.unspool_batch()) + .fetch(&self.db) + .peekable(); + let mut envelopes = pin!(envelopes); + relay_statsd::metric!(counter(RelayCounters::BufferReads) += 1); + // Stream is empty, we can break the loop, since we read everything by now. + if envelopes.as_mut().peek().await.is_none() { + break; + } + + let mut count: i64 = 0; + while let Some(envelope) = envelopes.as_mut().next().await { + count += 1; + let envelope = match envelope { + Ok(envelope) => envelope, + + // Bail if there are errors in the stream. + Err(err) => { + relay_log::error!( + error = &err as &dyn Error, + "failed to read the buffer stream from the disk", + ); + continue; + } + }; + + match self.extract_envelope(envelope, services) { + Ok((key, managed_envelopes)) => { + for managed_envelope in managed_envelopes { + result.entry(key).or_default().push(managed_envelope); + } + } + Err(err) => relay_log::error!( + error = &err as &dyn Error, + "failed to extract envelope from the buffer", + ), + } + } + self.track_count(-count); + } + + Ok(result) + } + /// Dequeues the envelopes from the on-disk spool and send them to the provided `sender`. /// /// The keys for which the envelopes could not be fetched, send back to `ProjectCache` to merge @@ -503,7 +608,7 @@ impl OnDisk { ) { let mut unused_keys = BTreeSet::new(); while let Some(key) = keys.pop() { - // If the error with a key is returned we must save it for the next iterration. + // If the error with a key is returned we must save it for the next iteration. if let Err(key) = self.delete_and_fetch(key, &sender, services).await { unused_keys.insert(key); }; @@ -625,7 +730,7 @@ impl BufferState { /// Becomes a different state, depending on the current state and the current conditions of /// underlying spool. - async fn transition(self, config: &Arc) -> Self { + async fn transition(self, config: &Config, services: &Services) -> Self { match self { Self::MemoryFileStandby { ram, mut disk } if ram.is_full() || disk.buffer_guard.is_over_high_watermark() => @@ -636,17 +741,51 @@ impl BufferState { "failed to spool the in-memory buffer to disk", ); } + relay_log::trace!( + "Transition to disk spool: # of envelopes = {}", + disk.count.unwrap_or_default() + ); + Self::Disk(disk) } - Self::Disk(disk) if disk.is_empty().await.unwrap_or_default() => { - Self::MemoryFileStandby { - ram: InMemory::new(config.spool_envelopes_max_memory_size()), - disk, + Self::Disk(mut disk) if Self::is_below_low_mem_watermark(config, &disk).await => { + match disk.delete_and_fetch_all(services).await { + Ok(buffer) => { + let ram = InMemory::new_with_buffer( + config.spool_envelopes_max_memory_size(), + buffer, + ); + relay_log::trace!( + "Transition to memory spool: # of envelopes = {}", + ram.envelope_count + ); + + Self::MemoryFileStandby { ram, disk } + } + Err(err) => { + relay_log::error!( + error = &err as &dyn Error, + "failed to move data from disk to memory, keep using on-disk spool", + ); + + Self::Disk(disk) + } } } Self::Memory(_) | Self::MemoryFileStandby { .. } | Self::Disk(_) => self, } } + + /// Returns `true` if the on-disk spooled data can fit in the memory. + /// + /// The spooled envelopes must: + /// * fit into memory and take not more than 30% of the configured space + /// * the used buffer guards also must be under the low watermark. + async fn is_below_low_mem_watermark(config: &Config, disk: &OnDisk) -> bool { + ((config.spool_envelopes_max_memory_size() as f64 * LOW_SPOOL_MEMORY_WATERMARK) as i64) + > disk.estimate_spool_size().await.unwrap_or(i64::MAX) + && disk.buffer_guard.is_below_low_watermark() + } } impl Default for BufferState { @@ -815,7 +954,7 @@ impl BufferService { } let state = std::mem::take(&mut self.state); - self.state = state.transition(&self.config).await; + self.state = state.transition(&self.config, &self.services).await; Ok(()) } @@ -840,7 +979,7 @@ impl BufferService { } } let state = std::mem::take(&mut self.state); - self.state = state.transition(&self.config).await; + self.state = state.transition(&self.config, &self.services).await; Ok(()) } @@ -865,7 +1004,7 @@ impl BufferService { } let state = std::mem::take(&mut self.state); - self.state = state.transition(&self.config).await; + self.state = state.transition(&self.config, &self.services).await; if count > 0 { relay_log::with_scope( @@ -1187,6 +1326,8 @@ mod tests { #[test] fn metrics_work() { + relay_log::init_test!(); + let buffer_guard: Arc<_> = BufferGuard::new(999999).into(); let config: Arc<_> = Config::from_json_value(serde_json::json!({ "spool": { @@ -1218,18 +1359,14 @@ mod tests { .unwrap(); // Send 5 envelopes - for i in 0..5 { - let res = service + for _ in 0..5 { + service .handle_enqueue(Enqueue { key, value: empty_managed_envelope(), }) - .await; - if i > 2 { - assert!(res.is_err()); - } else { - assert!(res.is_ok()); - } + .await + .unwrap(); } // Dequeue everything @@ -1249,7 +1386,7 @@ mod tests { while rx.recv().await.is_some() { count += 1; } - assert_eq!(count, 3); + assert_eq!(count, 5); }) }); @@ -1271,14 +1408,24 @@ mod tests { "buffer.writes:1|c", "buffer.envelopes_written:3|c", "buffer.envelopes_disk_count:3|g", - "buffer.disk_size:24576|h", - "buffer.disk_size:24576|h", + "buffer.disk_size:1031|h", + "buffer.envelopes_written:1|c", + "buffer.envelopes_disk_count:4|g", + "buffer.writes:1|c", + "buffer.disk_size:1372|h", + "buffer.disk_size:1372|h", + "buffer.envelopes_written:1|c", + "buffer.envelopes_disk_count:5|g", + "buffer.writes:1|c", + "buffer.disk_size:1713|h", "buffer.dequeue_attempts:1|h", "buffer.reads:1|c", - "buffer.envelopes_read:-3|c", + "buffer.envelopes_read:-5|c", "buffer.envelopes_disk_count:0|g", "buffer.dequeue_attempts:1|h", "buffer.reads:1|c", + "buffer.disk_size:8|h", + "buffer.reads:1|c", ] "#); } diff --git a/relay-server/src/actors/spooler/sql.rs b/relay-server/src/actors/spooler/sql.rs index 206b872273..c5eea7b2c4 100644 --- a/relay-server/src/actors/spooler/sql.rs +++ b/relay-server/src/actors/spooler/sql.rs @@ -24,20 +24,33 @@ const SQLITE_LIMIT_VARIABLE_NUMBER: usize = 999; /// the envelope was received. This will create a prepared statement which is cached and re-used. pub fn delete_and_fetch<'a>( key: QueueKey, - batch_size: u32, + batch_size: i64, ) -> Query<'a, Sqlite, SqliteArguments<'a>> { sqlx::query( "DELETE FROM envelopes WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? LIMIT ?) RETURNING - envelope, received_at", + received_at, own_key, sampling_key, envelope", ) .bind(key.own_key.to_string()) .bind(key.sampling_key.to_string()) .bind(batch_size) } +/// Creates a DELETE query which returns the requested batch of the envelopes with the timestamp +/// and designated keys. +pub fn delete_and_fetch_all<'a>(batch_size: i64) -> Query<'a, Sqlite, SqliteArguments<'a>> { + sqlx::query( + "DELETE FROM + envelopes + WHERE id IN (SELECT id FROM envelopes LIMIT ?) + RETURNING + received_at, own_key, sampling_key, envelope", + ) + .bind(batch_size) +} + /// Creates a DELETE query, which silently removes the data from the database. pub fn delete<'a>(key: QueueKey) -> Query<'a, Sqlite, SqliteArguments<'a>> { sqlx::query("DELETE FROM envelopes where own_key = ? AND sampling_key = ?") @@ -45,18 +58,16 @@ pub fn delete<'a>(key: QueueKey) -> Query<'a, Sqlite, SqliteArguments<'a>> { .bind(key.sampling_key.to_string()) } -/// Creates a query which fetches the current page count and page size. +/// Creates a query which fetches the `envelopes` table size. /// /// This info used to calculate the current allocated database size. pub fn current_size<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query( - "SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size();", - ) + sqlx::query(r#"SELECT SUM(pgsize - unused) FROM dbstat WHERE name="envelopes""#) } /// Creates the query to select only 1 record's `received_at` from the database. /// -/// It is usefull and very fast for checking if the table is empty. +/// It is useful and very fast for checking if the table is empty. pub fn select_one<'a>() -> Query<'a, Sqlite, SqliteArguments<'a>> { sqlx::query("SELECT received_at FROM envelopes LIMIT 1;") } @@ -76,7 +87,7 @@ pub fn insert<'a>( .bind(managed_envelope) } -/// Descibes the chunk item which is handled by insert statement. +/// Describes the chunk item which is handled by insert statement. type ChunkItem = (QueueKey, Vec, i64); /// Creates an INSERT query for the chunk of provided data. @@ -104,12 +115,12 @@ pub async fn do_insert( stream: impl Stream + std::marker::Unpin, db: &Pool, ) -> Result { - // Since we have 3 variables we have to bind, we devide the SQLite limit by 3 + // Since we have 3 variables we have to bind, we divide the SQLite limit by 3 // here to prepare the chunks which will be preparing the batch inserts. let mut envelopes = stream.chunks(SQLITE_LIMIT_VARIABLE_NUMBER / 3); // A builder type for constructing queries at runtime. - // This by default creates a prepared sql statement, which is cached and + // This by default creates a prepared SQL statement, which is cached and // re-used for sequential queries. let mut query_builder: QueryBuilder = QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) ");