diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index a766febf10..a23ab4446b 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -3,6 +3,7 @@ use std::error::Error; use std::sync::Arc; use std::time::Duration; +use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::{Config, RelayMode}; use relay_dynamic_config::GlobalConfig; @@ -183,14 +184,11 @@ pub struct AddMetricMeta { /// This message is sent from the project buffer in case of the error while fetching the data from /// the persistent buffer, ensuring that we still have the index pointing to the keys, which could be found in the /// persistent storage. -pub struct UpdateSpoolIndex { - project_key: ProjectKey, - keys: BTreeSet, -} +pub struct UpdateSpoolIndex(pub HashSet); impl UpdateSpoolIndex { - pub fn new(project_key: ProjectKey, keys: BTreeSet) -> Self { - Self { project_key, keys } + pub fn new(keys: HashSet) -> Self { + Self(keys) } } @@ -203,7 +201,7 @@ pub struct SpoolHealth; /// This index will be received only once shortly after startup and will trigger refresh for the /// project states for the project keys returned in the message. #[derive(Debug)] -pub struct RefreshIndexCache(pub BTreeMap>); +pub struct RefreshIndexCache(pub HashSet); /// A cache for [`ProjectState`]s. /// @@ -492,7 +490,7 @@ struct ProjectCacheBroker { buffer_tx: mpsc::UnboundedSender, buffer_guard: Arc, /// Index of the buffered project keys. - index: BTreeMap>, + index: HashSet, buffer_unspool_handle: SleepHandle, buffer_unspool_backoff: RetryBackoff, buffer: Addr, @@ -524,8 +522,7 @@ impl ProjectCacheBroker { /// Adds the value to the queue for the provided key. pub fn enqueue(&mut self, key: QueueKey, value: ManagedEnvelope) { - self.index.entry(key.own_key).or_default().insert(key); - self.index.entry(key.sampling_key).or_default().insert(key); + self.index.insert(key); self.buffer.send(Enqueue::new(key, value)); } @@ -533,49 +530,9 @@ impl ProjectCacheBroker { /// /// All the found envelopes will be send back through the `buffer_tx` channel and directly /// forwarded to `handle_processing`. - pub fn dequeue(&mut self, partial_key: ProjectKey) { - // If we don't yet have the global config, we will defer dequeuing until we do. - if let GlobalConfigStatus::Pending = self.global_config { - return; - } - - let mut result = Vec::new(); - let mut queue_keys = self.index.remove(&partial_key).unwrap_or_default(); - let mut index = BTreeSet::new(); - - while let Some(queue_key) = queue_keys.pop_first() { - // We only have to check `other_key`, because we already know that the `partial_key`s `state` - // is valid and loaded. - let other_key = if queue_key.own_key == partial_key { - queue_key.sampling_key - } else { - queue_key.own_key - }; - - if self - .projects - .get(&other_key) - // Make sure we have only cached and valid state. - .and_then(|p| p.valid_state()) - .map_or(false, |s| !s.invalid()) - { - result.push(queue_key); - } else { - index.insert(queue_key); - } - } - - if !index.is_empty() { - self.index.insert(partial_key, index); - } - - if !result.is_empty() { - self.buffer.send(DequeueMany::new( - partial_key, - result, - self.buffer_tx.clone(), - )) - } + pub fn dequeue(&self, keys: HashSet) { + self.buffer + .send(DequeueMany::new(keys, self.buffer_tx.clone())) } /// Evict projects that are over its expiry date. @@ -595,7 +552,12 @@ impl ProjectCacheBroker { // Defer dropping the projects to a dedicated thread: let mut count = 0; for (project_key, project) in expired { - if let Some(keys) = self.index.remove(&project_key) { + let keys = self + .index + .drain_filter(|key| key.own_key == project_key || key.sampling_key == project_key) + .collect::>(); + + if !keys.is_empty() { self.buffer.send(RemoveMany::new(project_key, keys)) } @@ -868,7 +830,7 @@ impl ProjectCacheBroker { } fn handle_buffer_index(&mut self, message: UpdateSpoolIndex) { - self.index.insert(message.project_key, message.keys); + self.index.extend(message.0); } fn handle_spool_health(&mut self, sender: Sender) { @@ -879,10 +841,14 @@ impl ProjectCacheBroker { let RefreshIndexCache(index) = message; let project_cache = self.services.project_cache.clone(); - for (key, values) in index { - self.index.entry(key).or_default().extend(values); - self.get_or_create_project(key) + for key in index { + self.index.insert(key); + self.get_or_create_project(key.own_key) .prefetch(project_cache.clone(), false); + if key.own_key != key.sampling_key { + self.get_or_create_project(key.sampling_key) + .prefetch(project_cache.clone(), false); + } } } @@ -899,6 +865,44 @@ impl ProjectCacheBroker { } } + /// Returns `true` if the project state valid for the [`QueueKey`]. + /// + /// Which includes the own key and the samplig key for the project. + /// Note: this function will trigger [`ProjectState`] refresh if it's already expired or not + /// valid. + fn is_state_valid(&mut self, key: &QueueKey) -> bool { + let QueueKey { + own_key, + sampling_key, + } = key; + + let is_own_state_valid = self.projects.get_mut(own_key).map_or(false, |project| { + // Returns `Some` if the project is cached otherwise None and also triggers refresh + // in background. + project + .get_cached_state(self.services.project_cache.clone(), false) + // Makes sure that the state also is valid. + .map_or(false, |state| !state.invalid()) + }); + + let is_sampling_state_valid = if own_key != sampling_key { + self.projects + .get_mut(sampling_key) + .map_or(false, |project| { + // Returns `Some` if the project is cached otherwise None and also triggers refresh + // in background. + project + .get_cached_state(self.services.project_cache.clone(), false) + // Makes sure that the state also is valid. + .map_or(false, |state| !state.invalid()) + }) + } else { + is_own_state_valid + }; + + is_own_state_valid && is_sampling_state_valid + } + /// Iterates the buffer index and tries to unspool the envelopes for projects with a valid /// state. /// @@ -907,40 +911,33 @@ impl ProjectCacheBroker { fn handle_periodic_unspool(&mut self) { self.buffer_unspool_handle.reset(); + // If we don't yet have the global config, we will defer dequeuing until we do. + if let GlobalConfigStatus::Pending = self.global_config { + self.buffer_unspool_backoff.reset(); + self.schedule_unspool(); + return; + } // If there is nothing spooled, schedule the next check a little bit later. - if self.index.is_empty() { + // And do *not* attempt to unspool if the assigned permits over low watermark. + if self.index.is_empty() || !self.buffer_guard.is_below_low_watermark() { self.schedule_unspool(); return; } - let keys = self.index.keys().cloned().collect::>(); - let mut dequeued = false; - - for project_key in keys.iter() { - if self.projects.get_mut(project_key).map_or(false, |project| { - // Returns `Some` if the project is cache otherwise None and also triggers refresh - // in background. - project - .get_cached_state(self.services.project_cache.clone(), false) - // Makes sure that the state also is valid. - .map_or(false, |state| !state.invalid()) - }) { - // Do *not* attempt to unspool if the assigned permits over low watermark. - if !self.buffer_guard.is_below_low_watermark() { - self.schedule_unspool(); - return; - } + let mut index = std::mem::take(&mut self.index); + let values = index + .drain_filter(|key| self.is_state_valid(key)) + .collect::>(); - self.dequeue(*project_key); - dequeued = true; - } + if !values.is_empty() { + self.dequeue(values); } - // If cannot dequeue for some reason, back off the next retry. - if dequeued { - self.buffer_unspool_backoff.reset(); - } + // Return all the un-used items to the index. + self.index.extend(index); + // Schedule unspool once we are done. + self.buffer_unspool_backoff.reset(); self.schedule_unspool(); } @@ -1073,7 +1070,7 @@ impl Service for ProjectCacheService { state_tx, buffer_tx, buffer_guard, - index: BTreeMap::new(), + index: HashSet::new(), buffer_unspool_handle: SleepHandle::idle(), buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()), buffer, @@ -1208,7 +1205,7 @@ mod tests { state_tx, buffer_tx, buffer_guard, - index: BTreeMap::new(), + index: HashSet::new(), buffer: buffer.clone(), global_config: GlobalConfigStatus::Pending, buffer_unspool_handle: SleepHandle::idle(), @@ -1250,8 +1247,7 @@ mod tests { // All the messages should have been spooled to disk. assert_eq!(buffer_guard.available(), 5); - assert_eq!(broker.index.keys().len(), 1); - assert_eq!(broker.index.values().count(), 1); + assert_eq!(broker.index.len(), 1); let project_key = ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(); let key = QueueKey { @@ -1261,11 +1257,7 @@ mod tests { let (tx, mut rx) = mpsc::unbounded_channel(); // Check if we can also dequeue from the buffer directly. - buffer_svc.send(spooler::DequeueMany::new( - project_key, - [key].into(), - tx.clone(), - )); + buffer_svc.send(spooler::DequeueMany::new([key].into(), tx.clone())); tokio::time::sleep(Duration::from_millis(100)).await; // We should be able to unspool 5 envelopes since we have 5 permits. @@ -1283,11 +1275,10 @@ mod tests { // Till now we should have enqueued 5 envelopes and dequeued only 1, it means the index is // still populated with same keys and values. - assert_eq!(broker.index.keys().len(), 1); - assert_eq!(broker.index.values().count(), 1); + assert_eq!(broker.index.len(), 1); // Check if we can also dequeue from the buffer directly. - buffer_svc.send(spooler::DequeueMany::new(project_key, [key].into(), tx)); + buffer_svc.send(spooler::DequeueMany::new([key].into(), tx)); tokio::time::sleep(Duration::from_millis(100)).await; // Cannot dequeue anymore, no more available permits. assert!(rx.try_recv().is_err()); @@ -1353,8 +1344,7 @@ mod tests { select! { Some(assert) = rx_assert.recv() => { - assert_eq!(broker.index.keys().len(), assert); - assert_eq!(broker.index.values().count(), assert); + assert_eq!(broker.index.len(), assert); }, Some(update) = rx_update.recv() => broker.merge_state(update), () = &mut broker.buffer_unspool_handle => broker.handle_periodic_unspool(), diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 68ca84da86..410df841bb 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -36,6 +36,8 @@ use std::pin::pin; use std::sync::Arc; use futures::stream::{self, StreamExt}; +use hashbrown::HashSet; +use itertools::Itertools; use relay_base_schema::project::{ParseProjectKeyError, ProjectKey}; use relay_config::Config; use relay_statsd::metric; @@ -145,22 +147,13 @@ impl Enqueue { /// Removes messages from the internal buffer and streams them to the sender. #[derive(Debug)] pub struct DequeueMany { - project_key: ProjectKey, - keys: Vec, + keys: HashSet, sender: mpsc::UnboundedSender, } impl DequeueMany { - pub fn new( - project_key: ProjectKey, - keys: Vec, - sender: mpsc::UnboundedSender, - ) -> Self { - Self { - project_key, - keys, - sender, - } + pub fn new(keys: HashSet, sender: mpsc::UnboundedSender) -> Self { + Self { keys, sender } } } @@ -342,9 +335,9 @@ impl InMemory { } /// Dequeues the envelopes from the in-memory buffer and send them to provided `sender`. - fn dequeue(&mut self, keys: &Vec, sender: mpsc::UnboundedSender) { + fn dequeue(&mut self, keys: HashSet, sender: mpsc::UnboundedSender) { for key in keys { - for envelope in self.buffer.remove(key).unwrap_or_default() { + for envelope in self.buffer.remove(&key).unwrap_or_default() { self.used_memory -= envelope.estimated_size(); self.envelope_count = self.envelope_count.saturating_sub(1); sender.send(envelope).ok(); @@ -505,15 +498,15 @@ impl OnDisk { /// Returns the amount of envelopes deleted from disk. async fn delete_and_fetch( &mut self, - key: QueueKey, - sender: &mpsc::UnboundedSender, + mut keys: Vec, + sender: mpsc::UnboundedSender, services: &Services, - ) -> Result<(), QueueKey> { + ) -> Result<(), Vec> { loop { // Before querying the db, make sure that the buffer guard has enough availability: self.dequeue_attempts += 1; if !self.buffer_guard.is_below_low_watermark() { - return Err(key); + break; } relay_statsd::metric!( histogram(RelayHistograms::BufferDequeueAttempts) = self.dequeue_attempts as u64 @@ -525,7 +518,8 @@ impl OnDisk { // 2. Make sure that if we panic and deleted envelopes cannot be read out fully, we do not lose all of them, // but only one batch, and the rest of them will stay on disk for the next iteration // to pick up. - let envelopes = sql::delete_and_fetch(key, self.unspool_batch()) + let query = sql::prepare_delete_query(std::mem::take(&mut keys)); + let envelopes = sql::delete_and_fetch(&query, self.unspool_batch()) .fetch(&self.db) .peekable(); let mut envelopes = pin!(envelopes); @@ -533,7 +527,7 @@ impl OnDisk { // Stream is empty, we can break the loop, since we read everything by now. if envelopes.as_mut().peek().await.is_none() { - return Ok(()); + break; } let mut count: i64 = 0; @@ -541,15 +535,13 @@ impl OnDisk { count += 1; let envelope = match envelope { Ok(envelope) => envelope, - - // Bail if there are errors in the stream. Err(err) => { relay_log::error!( error = &err as &dyn Error, "failed to read the buffer stream from the disk", ); self.track_count(-count); - return Err(key); + continue; } }; @@ -568,6 +560,12 @@ impl OnDisk { self.track_count(-count); } + + if !keys.is_empty() { + return Err(keys); + } + + Ok(()) } /// Unspools the entire contents of the on-disk spool. @@ -633,22 +631,33 @@ impl OnDisk { /// back into index. async fn dequeue( &mut self, - project_key: ProjectKey, - keys: &mut Vec, + mut keys: HashSet, sender: mpsc::UnboundedSender, services: &Services, ) { - let mut unused_keys = BTreeSet::new(); - while let Some(key) = keys.pop() { + if keys.is_empty() { + return; + } + + let mut unused_keys = HashSet::new(); + // Chunk up the incoming keys in batches of size `BATCH_SIZE`. + let chunks: Vec> = keys + .drain() + .chunks(BATCH_SIZE as usize) + .into_iter() + .map(|chunk| chunk.collect::>()) + .collect(); + + for chunk in chunks.into_iter() { // If the error with a key is returned we must save it for the next iteration. - if let Err(key) = self.delete_and_fetch(key, &sender, services).await { - unused_keys.insert(key); + if let Err(failed_keys) = self.delete_and_fetch(chunk, sender.clone(), services).await { + unused_keys.extend(failed_keys); }; } if !unused_keys.is_empty() { services .project_cache - .send(UpdateSpoolIndex::new(project_key, unused_keys)) + .send(UpdateSpoolIndex::new(unused_keys)) } } @@ -741,9 +750,7 @@ impl OnDisk { } /// Returns the index from the on-disk spool. - async fn get_spooled_index( - db: &Pool, - ) -> Result>, BufferError> { + async fn get_spooled_index(db: &Pool) -> Result, BufferError> { let keys = sql::get_keys() .fetch_all(db) .await @@ -753,15 +760,7 @@ impl OnDisk { .into_iter() // Collect only keys we could extract. .filter_map(Self::extract_key) - // Fold the list into the index format. - .fold( - BTreeMap::new(), - |mut acc: BTreeMap>, key| { - acc.entry(key.own_key).or_default().insert(key); - acc.entry(key.sampling_key).or_default().insert(key); - acc - }, - ); + .collect(); Ok(index) } @@ -1043,20 +1042,15 @@ impl BufferService { /// /// This method removes the envelopes from the buffer and stream them to the sender. async fn handle_dequeue(&mut self, message: DequeueMany) -> Result<(), BufferError> { - let DequeueMany { - project_key, - mut keys, - sender, - } = message; + let DequeueMany { keys, sender } = message; match self.state { BufferState::Memory(ref mut ram) | BufferState::MemoryFileStandby { ref mut ram, .. } => { - ram.dequeue(&keys, sender); + ram.dequeue(keys, sender); } BufferState::Disk(ref mut disk) => { - disk.dequeue(project_key, &mut keys, sender, &self.services) - .await; + disk.dequeue(keys, sender, &self.services).await; } } let state = std::mem::take(&mut self.state); @@ -1351,7 +1345,6 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000 * result)).await; addr.send(DequeueMany { - project_key, keys: [key].into(), sender: tx.clone(), }); @@ -1404,7 +1397,6 @@ mod tests { // Dequeue an envelope: addr.send(DequeueMany { - project_key, keys: [key].into(), sender: tx.clone(), }); @@ -1433,7 +1425,6 @@ mod tests { }); // Request to dequeue: addr.send(DequeueMany { - project_key, keys: [key].into(), sender: tx.clone(), }); @@ -1448,7 +1439,6 @@ mod tests { // Dequeue an envelope: addr.send(DequeueMany { - project_key, keys: [key].into(), sender: tx.clone(), }); @@ -1506,7 +1496,6 @@ mod tests { let (tx, mut rx) = mpsc::unbounded_channel(); service .handle_dequeue(DequeueMany { - project_key, keys: [key].into(), sender: tx, }) @@ -1702,7 +1691,7 @@ mod tests { assert_eq!(result.rows_affected(), 2); - let index = Arc::new(Mutex::new(BTreeMap::new())); + let index = Arc::new(Mutex::new(HashSet::new())); let mut services = services(); let index_in = index.clone(); let (project_cache, _) = mock_service("project_cache", (), move |(), msg: ProjectCache| { @@ -1725,19 +1714,130 @@ mod tests { // Get the index out of the mutex. let index = index.lock().unwrap().clone(); - assert_eq!(index.len(), 3); - assert!(index.contains_key(&ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971f00").unwrap())); - assert!(index.contains_key(&ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971f11").unwrap())); - assert!(index.contains_key(&ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap())); - let result_for_key = BTreeSet::from([QueueKey { + let key1 = QueueKey { own_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), sampling_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - }]); - assert_eq!( - index - .get(&ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap()) - .unwrap(), - &result_for_key - ); + }; + let key2 = QueueKey { + own_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971f00").unwrap(), + sampling_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971f11").unwrap(), + }; + + assert_eq!(index.len(), 2); + assert!(index.contains(&key1)); + assert!(index.contains(&key2)); + } + + #[tokio::test] + async fn chunked_unspool() { + let db_path = std::env::temp_dir().join(Uuid::new_v4().to_string()); + let buffer_guard: Arc<_> = BufferGuard::new(10000).into(); + let config: Arc<_> = Config::from_json_value(serde_json::json!({ + "spool": { + "envelopes": { + "path": db_path, + "max_memory_size": "10KB", + "max_disk_size": "20MB", + } + } + })) + .unwrap() + .into(); + + let services = services(); + let buffer = BufferService::create(buffer_guard, services, config) + .await + .unwrap(); + let addr = buffer.start(); + + let mut keys = HashSet::new(); + for _ in 1..=300 { + let project_key = uuid::Uuid::new_v4().as_simple().to_string(); + let key = ProjectKey::parse(&project_key).unwrap(); + let index_key = QueueKey { + own_key: key, + sampling_key: key, + }; + keys.insert(index_key); + addr.send(Enqueue::new(index_key, empty_managed_envelope())) + } + + let (tx, mut rx) = mpsc::unbounded_channel(); + // Dequeue all the keys at once. + addr.send(DequeueMany { + keys, + sender: tx.clone(), + }); + drop(tx); + + let mut count = 0; + while rx.recv().await.is_some() { + count += 1; + } + assert_eq!(count, 300); + } + + #[tokio::test] + async fn over_the_low_watermark() { + let db_path = std::env::temp_dir().join(Uuid::new_v4().to_string()); + let buffer_guard: Arc<_> = BufferGuard::new(300).into(); + let config: Arc<_> = Config::from_json_value(serde_json::json!({ + "spool": { + "envelopes": { + "path": db_path, + "max_memory_size": "10KB", + "max_disk_size": "20MB", + } + } + })) + .unwrap() + .into(); + + let index = Arc::new(Mutex::new(HashSet::new())); + let mut services = services(); + let index_in = index.clone(); + let (project_cache, _) = mock_service("project_cache", (), move |(), msg: ProjectCache| { + // First chunk in the unspool will take us over the low watermark, that means we will get + // small portion of the keys back. + let ProjectCache::UpdateSpoolIndex(new_index) = msg else { + return; + }; + index_in.lock().unwrap().extend(new_index.0); + }); + + services.project_cache = project_cache; + let buffer = BufferService::create(buffer_guard, services, config) + .await + .unwrap(); + let addr = buffer.start(); + + let mut keys = HashSet::new(); + for _ in 1..=300 { + let project_key = uuid::Uuid::new_v4().as_simple().to_string(); + let key = ProjectKey::parse(&project_key).unwrap(); + let index_key = QueueKey { + own_key: key, + sampling_key: key, + }; + keys.insert(index_key); + addr.send(Enqueue::new(index_key, empty_managed_envelope())) + } + + let (tx, mut rx) = mpsc::unbounded_channel(); + // Dequeue all the keys at once. + addr.send(DequeueMany { + keys, + sender: tx.clone(), + }); + drop(tx); + + let mut envelopes = Vec::new(); + while let Some(envelope) = rx.recv().await { + envelopes.push(envelope); + } + + assert_eq!(envelopes.len(), 200); + let index = index.lock().unwrap().clone(); + assert_eq!(index.len(), 100); } } diff --git a/relay-server/src/services/spooler/sql.rs b/relay-server/src/services/spooler/sql.rs index 99498c956e..bef4b566a8 100644 --- a/relay-server/src/services/spooler/sql.rs +++ b/relay-server/src/services/spooler/sql.rs @@ -2,6 +2,7 @@ //! the on-disk spool (currently backed by SQLite). use futures::stream::{Stream, StreamExt}; +use itertools::Itertools; use sqlx::query::Query; use sqlx::sqlite::SqliteArguments; use sqlx::{Pool, QueryBuilder, Sqlite}; @@ -17,25 +18,36 @@ use crate::statsd::RelayCounters; /// Keep it on the lower side for now. const SQLITE_LIMIT_VARIABLE_NUMBER: usize = 999; +/// Prepares a DELETE query, by properly genering IN clauses for provided keys. +pub fn prepare_delete_query(keys: Vec) -> String { + let (own_keys, sampling_keys) = keys.iter().fold( + (Vec::new(), Vec::new()), + |(mut own_keys, mut sampling_keys), key| { + own_keys.push(format!(r#""{}""#, key.own_key)); + sampling_keys.push(format!(r#""{}""#, key.sampling_key)); + (own_keys, sampling_keys) + }, + ); + + let own_keys = own_keys.into_iter().join(","); + let sampling_keys = sampling_keys.into_iter().join(","); + + format!( + "DELETE FROM + envelopes + WHERE id IN (SELECT id FROM envelopes WHERE own_key in ({}) AND sampling_key in ({}) LIMIT ?) + RETURNING + received_at, own_key, sampling_key, envelope", own_keys, sampling_keys + ) +} + /// Creates a DELETE query binding to the provided [`QueueKey`] which returns the envelopes and /// timestamp. /// /// The query will perform the delete once executed returning deleted envelope and timestamp when /// the envelope was received. This will create a prepared statement which is cached and re-used. -pub fn delete_and_fetch<'a>( - key: QueueKey, - batch_size: i64, -) -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query( - "DELETE FROM - envelopes - WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? LIMIT ?) - RETURNING - received_at, own_key, sampling_key, envelope", - ) - .bind(key.own_key.to_string()) - .bind(key.sampling_key.to_string()) - .bind(batch_size) +pub fn delete_and_fetch(query: &str, batch_size: i64) -> Query<'_, Sqlite, SqliteArguments<'_>> { + sqlx::query(query).bind(batch_size) } /// Creates a DELETE query which returns the requested batch of the envelopes with the timestamp