From 4e57b1fb1ba81e511b0f20a99cc2101c3e11cb77 Mon Sep 17 00:00:00 2001 From: Oleksandr <1931331+olksdr@users.noreply.github.com> Date: Mon, 12 Feb 2024 07:23:50 +0100 Subject: [PATCH] chore(spool): Improve unspool strategy (#3047) The recent stats show that when we have a lot of records with only 1 envelope per key, the unspool is very slow, since the service has to go into DB and fetch 1 key at a time, which can block the `Buffer` and by accumulating the messages in the internal queue it can also use up more memory. This change is introducing batching for all the valid keys, which makes sure we can unspool many records at once, which allows us to decrease the number sent message between the services and also read from the disk more data in one go. Make sure we can unspool faster from the buffer: * requests all the envelopes for all the valid projects * batch those requests for all the incoming keys * make sure we return keys back when is over low watermark --- relay-server/src/services/project_cache.rs | 184 ++++++++-------- relay-server/src/services/spooler/mod.rs | 236 +++++++++++++++------ relay-server/src/services/spooler/sql.rs | 40 ++-- 3 files changed, 281 insertions(+), 179 deletions(-) diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index a766febf10..a23ab4446b 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -3,6 +3,7 @@ use std::error::Error; use std::sync::Arc; use std::time::Duration; +use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::{Config, RelayMode}; use relay_dynamic_config::GlobalConfig; @@ -183,14 +184,11 @@ pub struct AddMetricMeta { /// This message is sent from the project buffer in case of the error while fetching the data from /// the persistent buffer, ensuring that we still have the index pointing to the keys, which could be found in the /// persistent storage. -pub struct UpdateSpoolIndex { - project_key: ProjectKey, - keys: BTreeSet, -} +pub struct UpdateSpoolIndex(pub HashSet); impl UpdateSpoolIndex { - pub fn new(project_key: ProjectKey, keys: BTreeSet) -> Self { - Self { project_key, keys } + pub fn new(keys: HashSet) -> Self { + Self(keys) } } @@ -203,7 +201,7 @@ pub struct SpoolHealth; /// This index will be received only once shortly after startup and will trigger refresh for the /// project states for the project keys returned in the message. #[derive(Debug)] -pub struct RefreshIndexCache(pub BTreeMap>); +pub struct RefreshIndexCache(pub HashSet); /// A cache for [`ProjectState`]s. /// @@ -492,7 +490,7 @@ struct ProjectCacheBroker { buffer_tx: mpsc::UnboundedSender, buffer_guard: Arc, /// Index of the buffered project keys. - index: BTreeMap>, + index: HashSet, buffer_unspool_handle: SleepHandle, buffer_unspool_backoff: RetryBackoff, buffer: Addr, @@ -524,8 +522,7 @@ impl ProjectCacheBroker { /// Adds the value to the queue for the provided key. pub fn enqueue(&mut self, key: QueueKey, value: ManagedEnvelope) { - self.index.entry(key.own_key).or_default().insert(key); - self.index.entry(key.sampling_key).or_default().insert(key); + self.index.insert(key); self.buffer.send(Enqueue::new(key, value)); } @@ -533,49 +530,9 @@ impl ProjectCacheBroker { /// /// All the found envelopes will be send back through the `buffer_tx` channel and directly /// forwarded to `handle_processing`. - pub fn dequeue(&mut self, partial_key: ProjectKey) { - // If we don't yet have the global config, we will defer dequeuing until we do. - if let GlobalConfigStatus::Pending = self.global_config { - return; - } - - let mut result = Vec::new(); - let mut queue_keys = self.index.remove(&partial_key).unwrap_or_default(); - let mut index = BTreeSet::new(); - - while let Some(queue_key) = queue_keys.pop_first() { - // We only have to check `other_key`, because we already know that the `partial_key`s `state` - // is valid and loaded. - let other_key = if queue_key.own_key == partial_key { - queue_key.sampling_key - } else { - queue_key.own_key - }; - - if self - .projects - .get(&other_key) - // Make sure we have only cached and valid state. - .and_then(|p| p.valid_state()) - .map_or(false, |s| !s.invalid()) - { - result.push(queue_key); - } else { - index.insert(queue_key); - } - } - - if !index.is_empty() { - self.index.insert(partial_key, index); - } - - if !result.is_empty() { - self.buffer.send(DequeueMany::new( - partial_key, - result, - self.buffer_tx.clone(), - )) - } + pub fn dequeue(&self, keys: HashSet) { + self.buffer + .send(DequeueMany::new(keys, self.buffer_tx.clone())) } /// Evict projects that are over its expiry date. @@ -595,7 +552,12 @@ impl ProjectCacheBroker { // Defer dropping the projects to a dedicated thread: let mut count = 0; for (project_key, project) in expired { - if let Some(keys) = self.index.remove(&project_key) { + let keys = self + .index + .drain_filter(|key| key.own_key == project_key || key.sampling_key == project_key) + .collect::>(); + + if !keys.is_empty() { self.buffer.send(RemoveMany::new(project_key, keys)) } @@ -868,7 +830,7 @@ impl ProjectCacheBroker { } fn handle_buffer_index(&mut self, message: UpdateSpoolIndex) { - self.index.insert(message.project_key, message.keys); + self.index.extend(message.0); } fn handle_spool_health(&mut self, sender: Sender) { @@ -879,10 +841,14 @@ impl ProjectCacheBroker { let RefreshIndexCache(index) = message; let project_cache = self.services.project_cache.clone(); - for (key, values) in index { - self.index.entry(key).or_default().extend(values); - self.get_or_create_project(key) + for key in index { + self.index.insert(key); + self.get_or_create_project(key.own_key) .prefetch(project_cache.clone(), false); + if key.own_key != key.sampling_key { + self.get_or_create_project(key.sampling_key) + .prefetch(project_cache.clone(), false); + } } } @@ -899,6 +865,44 @@ impl ProjectCacheBroker { } } + /// Returns `true` if the project state valid for the [`QueueKey`]. + /// + /// Which includes the own key and the samplig key for the project. + /// Note: this function will trigger [`ProjectState`] refresh if it's already expired or not + /// valid. + fn is_state_valid(&mut self, key: &QueueKey) -> bool { + let QueueKey { + own_key, + sampling_key, + } = key; + + let is_own_state_valid = self.projects.get_mut(own_key).map_or(false, |project| { + // Returns `Some` if the project is cached otherwise None and also triggers refresh + // in background. + project + .get_cached_state(self.services.project_cache.clone(), false) + // Makes sure that the state also is valid. + .map_or(false, |state| !state.invalid()) + }); + + let is_sampling_state_valid = if own_key != sampling_key { + self.projects + .get_mut(sampling_key) + .map_or(false, |project| { + // Returns `Some` if the project is cached otherwise None and also triggers refresh + // in background. + project + .get_cached_state(self.services.project_cache.clone(), false) + // Makes sure that the state also is valid. + .map_or(false, |state| !state.invalid()) + }) + } else { + is_own_state_valid + }; + + is_own_state_valid && is_sampling_state_valid + } + /// Iterates the buffer index and tries to unspool the envelopes for projects with a valid /// state. /// @@ -907,40 +911,33 @@ impl ProjectCacheBroker { fn handle_periodic_unspool(&mut self) { self.buffer_unspool_handle.reset(); + // If we don't yet have the global config, we will defer dequeuing until we do. + if let GlobalConfigStatus::Pending = self.global_config { + self.buffer_unspool_backoff.reset(); + self.schedule_unspool(); + return; + } // If there is nothing spooled, schedule the next check a little bit later. - if self.index.is_empty() { + // And do *not* attempt to unspool if the assigned permits over low watermark. + if self.index.is_empty() || !self.buffer_guard.is_below_low_watermark() { self.schedule_unspool(); return; } - let keys = self.index.keys().cloned().collect::>(); - let mut dequeued = false; - - for project_key in keys.iter() { - if self.projects.get_mut(project_key).map_or(false, |project| { - // Returns `Some` if the project is cache otherwise None and also triggers refresh - // in background. - project - .get_cached_state(self.services.project_cache.clone(), false) - // Makes sure that the state also is valid. - .map_or(false, |state| !state.invalid()) - }) { - // Do *not* attempt to unspool if the assigned permits over low watermark. - if !self.buffer_guard.is_below_low_watermark() { - self.schedule_unspool(); - return; - } + let mut index = std::mem::take(&mut self.index); + let values = index + .drain_filter(|key| self.is_state_valid(key)) + .collect::>(); - self.dequeue(*project_key); - dequeued = true; - } + if !values.is_empty() { + self.dequeue(values); } - // If cannot dequeue for some reason, back off the next retry. - if dequeued { - self.buffer_unspool_backoff.reset(); - } + // Return all the un-used items to the index. + self.index.extend(index); + // Schedule unspool once we are done. + self.buffer_unspool_backoff.reset(); self.schedule_unspool(); } @@ -1073,7 +1070,7 @@ impl Service for ProjectCacheService { state_tx, buffer_tx, buffer_guard, - index: BTreeMap::new(), + index: HashSet::new(), buffer_unspool_handle: SleepHandle::idle(), buffer_unspool_backoff: RetryBackoff::new(config.http_max_retry_interval()), buffer, @@ -1208,7 +1205,7 @@ mod tests { state_tx, buffer_tx, buffer_guard, - index: BTreeMap::new(), + index: HashSet::new(), buffer: buffer.clone(), global_config: GlobalConfigStatus::Pending, buffer_unspool_handle: SleepHandle::idle(), @@ -1250,8 +1247,7 @@ mod tests { // All the messages should have been spooled to disk. assert_eq!(buffer_guard.available(), 5); - assert_eq!(broker.index.keys().len(), 1); - assert_eq!(broker.index.values().count(), 1); + assert_eq!(broker.index.len(), 1); let project_key = ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(); let key = QueueKey { @@ -1261,11 +1257,7 @@ mod tests { let (tx, mut rx) = mpsc::unbounded_channel(); // Check if we can also dequeue from the buffer directly. - buffer_svc.send(spooler::DequeueMany::new( - project_key, - [key].into(), - tx.clone(), - )); + buffer_svc.send(spooler::DequeueMany::new([key].into(), tx.clone())); tokio::time::sleep(Duration::from_millis(100)).await; // We should be able to unspool 5 envelopes since we have 5 permits. @@ -1283,11 +1275,10 @@ mod tests { // Till now we should have enqueued 5 envelopes and dequeued only 1, it means the index is // still populated with same keys and values. - assert_eq!(broker.index.keys().len(), 1); - assert_eq!(broker.index.values().count(), 1); + assert_eq!(broker.index.len(), 1); // Check if we can also dequeue from the buffer directly. - buffer_svc.send(spooler::DequeueMany::new(project_key, [key].into(), tx)); + buffer_svc.send(spooler::DequeueMany::new([key].into(), tx)); tokio::time::sleep(Duration::from_millis(100)).await; // Cannot dequeue anymore, no more available permits. assert!(rx.try_recv().is_err()); @@ -1353,8 +1344,7 @@ mod tests { select! { Some(assert) = rx_assert.recv() => { - assert_eq!(broker.index.keys().len(), assert); - assert_eq!(broker.index.values().count(), assert); + assert_eq!(broker.index.len(), assert); }, Some(update) = rx_update.recv() => broker.merge_state(update), () = &mut broker.buffer_unspool_handle => broker.handle_periodic_unspool(), diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 68ca84da86..410df841bb 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -36,6 +36,8 @@ use std::pin::pin; use std::sync::Arc; use futures::stream::{self, StreamExt}; +use hashbrown::HashSet; +use itertools::Itertools; use relay_base_schema::project::{ParseProjectKeyError, ProjectKey}; use relay_config::Config; use relay_statsd::metric; @@ -145,22 +147,13 @@ impl Enqueue { /// Removes messages from the internal buffer and streams them to the sender. #[derive(Debug)] pub struct DequeueMany { - project_key: ProjectKey, - keys: Vec, + keys: HashSet, sender: mpsc::UnboundedSender, } impl DequeueMany { - pub fn new( - project_key: ProjectKey, - keys: Vec, - sender: mpsc::UnboundedSender, - ) -> Self { - Self { - project_key, - keys, - sender, - } + pub fn new(keys: HashSet, sender: mpsc::UnboundedSender) -> Self { + Self { keys, sender } } } @@ -342,9 +335,9 @@ impl InMemory { } /// Dequeues the envelopes from the in-memory buffer and send them to provided `sender`. - fn dequeue(&mut self, keys: &Vec, sender: mpsc::UnboundedSender) { + fn dequeue(&mut self, keys: HashSet, sender: mpsc::UnboundedSender) { for key in keys { - for envelope in self.buffer.remove(key).unwrap_or_default() { + for envelope in self.buffer.remove(&key).unwrap_or_default() { self.used_memory -= envelope.estimated_size(); self.envelope_count = self.envelope_count.saturating_sub(1); sender.send(envelope).ok(); @@ -505,15 +498,15 @@ impl OnDisk { /// Returns the amount of envelopes deleted from disk. async fn delete_and_fetch( &mut self, - key: QueueKey, - sender: &mpsc::UnboundedSender, + mut keys: Vec, + sender: mpsc::UnboundedSender, services: &Services, - ) -> Result<(), QueueKey> { + ) -> Result<(), Vec> { loop { // Before querying the db, make sure that the buffer guard has enough availability: self.dequeue_attempts += 1; if !self.buffer_guard.is_below_low_watermark() { - return Err(key); + break; } relay_statsd::metric!( histogram(RelayHistograms::BufferDequeueAttempts) = self.dequeue_attempts as u64 @@ -525,7 +518,8 @@ impl OnDisk { // 2. Make sure that if we panic and deleted envelopes cannot be read out fully, we do not lose all of them, // but only one batch, and the rest of them will stay on disk for the next iteration // to pick up. - let envelopes = sql::delete_and_fetch(key, self.unspool_batch()) + let query = sql::prepare_delete_query(std::mem::take(&mut keys)); + let envelopes = sql::delete_and_fetch(&query, self.unspool_batch()) .fetch(&self.db) .peekable(); let mut envelopes = pin!(envelopes); @@ -533,7 +527,7 @@ impl OnDisk { // Stream is empty, we can break the loop, since we read everything by now. if envelopes.as_mut().peek().await.is_none() { - return Ok(()); + break; } let mut count: i64 = 0; @@ -541,15 +535,13 @@ impl OnDisk { count += 1; let envelope = match envelope { Ok(envelope) => envelope, - - // Bail if there are errors in the stream. Err(err) => { relay_log::error!( error = &err as &dyn Error, "failed to read the buffer stream from the disk", ); self.track_count(-count); - return Err(key); + continue; } }; @@ -568,6 +560,12 @@ impl OnDisk { self.track_count(-count); } + + if !keys.is_empty() { + return Err(keys); + } + + Ok(()) } /// Unspools the entire contents of the on-disk spool. @@ -633,22 +631,33 @@ impl OnDisk { /// back into index. async fn dequeue( &mut self, - project_key: ProjectKey, - keys: &mut Vec, + mut keys: HashSet, sender: mpsc::UnboundedSender, services: &Services, ) { - let mut unused_keys = BTreeSet::new(); - while let Some(key) = keys.pop() { + if keys.is_empty() { + return; + } + + let mut unused_keys = HashSet::new(); + // Chunk up the incoming keys in batches of size `BATCH_SIZE`. + let chunks: Vec> = keys + .drain() + .chunks(BATCH_SIZE as usize) + .into_iter() + .map(|chunk| chunk.collect::>()) + .collect(); + + for chunk in chunks.into_iter() { // If the error with a key is returned we must save it for the next iteration. - if let Err(key) = self.delete_and_fetch(key, &sender, services).await { - unused_keys.insert(key); + if let Err(failed_keys) = self.delete_and_fetch(chunk, sender.clone(), services).await { + unused_keys.extend(failed_keys); }; } if !unused_keys.is_empty() { services .project_cache - .send(UpdateSpoolIndex::new(project_key, unused_keys)) + .send(UpdateSpoolIndex::new(unused_keys)) } } @@ -741,9 +750,7 @@ impl OnDisk { } /// Returns the index from the on-disk spool. - async fn get_spooled_index( - db: &Pool, - ) -> Result>, BufferError> { + async fn get_spooled_index(db: &Pool) -> Result, BufferError> { let keys = sql::get_keys() .fetch_all(db) .await @@ -753,15 +760,7 @@ impl OnDisk { .into_iter() // Collect only keys we could extract. .filter_map(Self::extract_key) - // Fold the list into the index format. - .fold( - BTreeMap::new(), - |mut acc: BTreeMap>, key| { - acc.entry(key.own_key).or_default().insert(key); - acc.entry(key.sampling_key).or_default().insert(key); - acc - }, - ); + .collect(); Ok(index) } @@ -1043,20 +1042,15 @@ impl BufferService { /// /// This method removes the envelopes from the buffer and stream them to the sender. async fn handle_dequeue(&mut self, message: DequeueMany) -> Result<(), BufferError> { - let DequeueMany { - project_key, - mut keys, - sender, - } = message; + let DequeueMany { keys, sender } = message; match self.state { BufferState::Memory(ref mut ram) | BufferState::MemoryFileStandby { ref mut ram, .. } => { - ram.dequeue(&keys, sender); + ram.dequeue(keys, sender); } BufferState::Disk(ref mut disk) => { - disk.dequeue(project_key, &mut keys, sender, &self.services) - .await; + disk.dequeue(keys, sender, &self.services).await; } } let state = std::mem::take(&mut self.state); @@ -1351,7 +1345,6 @@ mod tests { tokio::time::sleep(Duration::from_millis(1000 * result)).await; addr.send(DequeueMany { - project_key, keys: [key].into(), sender: tx.clone(), }); @@ -1404,7 +1397,6 @@ mod tests { // Dequeue an envelope: addr.send(DequeueMany { - project_key, keys: [key].into(), sender: tx.clone(), }); @@ -1433,7 +1425,6 @@ mod tests { }); // Request to dequeue: addr.send(DequeueMany { - project_key, keys: [key].into(), sender: tx.clone(), }); @@ -1448,7 +1439,6 @@ mod tests { // Dequeue an envelope: addr.send(DequeueMany { - project_key, keys: [key].into(), sender: tx.clone(), }); @@ -1506,7 +1496,6 @@ mod tests { let (tx, mut rx) = mpsc::unbounded_channel(); service .handle_dequeue(DequeueMany { - project_key, keys: [key].into(), sender: tx, }) @@ -1702,7 +1691,7 @@ mod tests { assert_eq!(result.rows_affected(), 2); - let index = Arc::new(Mutex::new(BTreeMap::new())); + let index = Arc::new(Mutex::new(HashSet::new())); let mut services = services(); let index_in = index.clone(); let (project_cache, _) = mock_service("project_cache", (), move |(), msg: ProjectCache| { @@ -1725,19 +1714,130 @@ mod tests { // Get the index out of the mutex. let index = index.lock().unwrap().clone(); - assert_eq!(index.len(), 3); - assert!(index.contains_key(&ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971f00").unwrap())); - assert!(index.contains_key(&ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971f11").unwrap())); - assert!(index.contains_key(&ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap())); - let result_for_key = BTreeSet::from([QueueKey { + let key1 = QueueKey { own_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), sampling_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), - }]); - assert_eq!( - index - .get(&ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap()) - .unwrap(), - &result_for_key - ); + }; + let key2 = QueueKey { + own_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971f00").unwrap(), + sampling_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971f11").unwrap(), + }; + + assert_eq!(index.len(), 2); + assert!(index.contains(&key1)); + assert!(index.contains(&key2)); + } + + #[tokio::test] + async fn chunked_unspool() { + let db_path = std::env::temp_dir().join(Uuid::new_v4().to_string()); + let buffer_guard: Arc<_> = BufferGuard::new(10000).into(); + let config: Arc<_> = Config::from_json_value(serde_json::json!({ + "spool": { + "envelopes": { + "path": db_path, + "max_memory_size": "10KB", + "max_disk_size": "20MB", + } + } + })) + .unwrap() + .into(); + + let services = services(); + let buffer = BufferService::create(buffer_guard, services, config) + .await + .unwrap(); + let addr = buffer.start(); + + let mut keys = HashSet::new(); + for _ in 1..=300 { + let project_key = uuid::Uuid::new_v4().as_simple().to_string(); + let key = ProjectKey::parse(&project_key).unwrap(); + let index_key = QueueKey { + own_key: key, + sampling_key: key, + }; + keys.insert(index_key); + addr.send(Enqueue::new(index_key, empty_managed_envelope())) + } + + let (tx, mut rx) = mpsc::unbounded_channel(); + // Dequeue all the keys at once. + addr.send(DequeueMany { + keys, + sender: tx.clone(), + }); + drop(tx); + + let mut count = 0; + while rx.recv().await.is_some() { + count += 1; + } + assert_eq!(count, 300); + } + + #[tokio::test] + async fn over_the_low_watermark() { + let db_path = std::env::temp_dir().join(Uuid::new_v4().to_string()); + let buffer_guard: Arc<_> = BufferGuard::new(300).into(); + let config: Arc<_> = Config::from_json_value(serde_json::json!({ + "spool": { + "envelopes": { + "path": db_path, + "max_memory_size": "10KB", + "max_disk_size": "20MB", + } + } + })) + .unwrap() + .into(); + + let index = Arc::new(Mutex::new(HashSet::new())); + let mut services = services(); + let index_in = index.clone(); + let (project_cache, _) = mock_service("project_cache", (), move |(), msg: ProjectCache| { + // First chunk in the unspool will take us over the low watermark, that means we will get + // small portion of the keys back. + let ProjectCache::UpdateSpoolIndex(new_index) = msg else { + return; + }; + index_in.lock().unwrap().extend(new_index.0); + }); + + services.project_cache = project_cache; + let buffer = BufferService::create(buffer_guard, services, config) + .await + .unwrap(); + let addr = buffer.start(); + + let mut keys = HashSet::new(); + for _ in 1..=300 { + let project_key = uuid::Uuid::new_v4().as_simple().to_string(); + let key = ProjectKey::parse(&project_key).unwrap(); + let index_key = QueueKey { + own_key: key, + sampling_key: key, + }; + keys.insert(index_key); + addr.send(Enqueue::new(index_key, empty_managed_envelope())) + } + + let (tx, mut rx) = mpsc::unbounded_channel(); + // Dequeue all the keys at once. + addr.send(DequeueMany { + keys, + sender: tx.clone(), + }); + drop(tx); + + let mut envelopes = Vec::new(); + while let Some(envelope) = rx.recv().await { + envelopes.push(envelope); + } + + assert_eq!(envelopes.len(), 200); + let index = index.lock().unwrap().clone(); + assert_eq!(index.len(), 100); } } diff --git a/relay-server/src/services/spooler/sql.rs b/relay-server/src/services/spooler/sql.rs index 99498c956e..bef4b566a8 100644 --- a/relay-server/src/services/spooler/sql.rs +++ b/relay-server/src/services/spooler/sql.rs @@ -2,6 +2,7 @@ //! the on-disk spool (currently backed by SQLite). use futures::stream::{Stream, StreamExt}; +use itertools::Itertools; use sqlx::query::Query; use sqlx::sqlite::SqliteArguments; use sqlx::{Pool, QueryBuilder, Sqlite}; @@ -17,25 +18,36 @@ use crate::statsd::RelayCounters; /// Keep it on the lower side for now. const SQLITE_LIMIT_VARIABLE_NUMBER: usize = 999; +/// Prepares a DELETE query, by properly genering IN clauses for provided keys. +pub fn prepare_delete_query(keys: Vec) -> String { + let (own_keys, sampling_keys) = keys.iter().fold( + (Vec::new(), Vec::new()), + |(mut own_keys, mut sampling_keys), key| { + own_keys.push(format!(r#""{}""#, key.own_key)); + sampling_keys.push(format!(r#""{}""#, key.sampling_key)); + (own_keys, sampling_keys) + }, + ); + + let own_keys = own_keys.into_iter().join(","); + let sampling_keys = sampling_keys.into_iter().join(","); + + format!( + "DELETE FROM + envelopes + WHERE id IN (SELECT id FROM envelopes WHERE own_key in ({}) AND sampling_key in ({}) LIMIT ?) + RETURNING + received_at, own_key, sampling_key, envelope", own_keys, sampling_keys + ) +} + /// Creates a DELETE query binding to the provided [`QueueKey`] which returns the envelopes and /// timestamp. /// /// The query will perform the delete once executed returning deleted envelope and timestamp when /// the envelope was received. This will create a prepared statement which is cached and re-used. -pub fn delete_and_fetch<'a>( - key: QueueKey, - batch_size: i64, -) -> Query<'a, Sqlite, SqliteArguments<'a>> { - sqlx::query( - "DELETE FROM - envelopes - WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? LIMIT ?) - RETURNING - received_at, own_key, sampling_key, envelope", - ) - .bind(key.own_key.to_string()) - .bind(key.sampling_key.to_string()) - .bind(batch_size) +pub fn delete_and_fetch(query: &str, batch_size: i64) -> Query<'_, Sqlite, SqliteArguments<'_>> { + sqlx::query(query).bind(batch_size) } /// Creates a DELETE query which returns the requested batch of the envelopes with the timestamp