Skip to content

Commit

Permalink
Improve
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jul 30, 2024
1 parent 1729051 commit af36538
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
12 changes: 6 additions & 6 deletions relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ mod tests {
#[should_panic]
async fn test_push_with_mismatching_project_keys() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let envelope_store = SqliteEnvelopeStore::new(db);
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
2,
Expand All @@ -298,7 +298,7 @@ mod tests {
#[tokio::test]
async fn test_push_when_db_is_not_valid() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let envelope_store = SqliteEnvelopeStore::new(db);
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
2,
Expand Down Expand Up @@ -350,7 +350,7 @@ mod tests {
#[tokio::test]
async fn test_pop_when_db_is_not_valid() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let envelope_store = SqliteEnvelopeStore::new(db);
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
2,
Expand All @@ -369,7 +369,7 @@ mod tests {
#[tokio::test]
async fn test_pop_when_stack_is_empty() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let envelope_store = SqliteEnvelopeStore::new(db);
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
2,
Expand All @@ -386,7 +386,7 @@ mod tests {
#[tokio::test]
async fn test_push_below_threshold_and_pop() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let envelope_store = SqliteEnvelopeStore::new(db);
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
5,
Expand Down Expand Up @@ -423,7 +423,7 @@ mod tests {
#[tokio::test]
async fn test_push_above_threshold_and_pop() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let envelope_store = SqliteEnvelopeStore::new(db);
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
5,
Expand Down
36 changes: 19 additions & 17 deletions relay-server/src/services/buffer/sqlite_envelope_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::Path;
use std::pin::pin;

use futures::stream::StreamExt;
use hashbrown::HashSet;
use sqlx::migrate::MigrateError;
use sqlx::query::Query;
use sqlx::sqlite::{
Expand Down Expand Up @@ -73,12 +74,12 @@ pub enum SqliteEnvelopeStoreError {
#[derive(Debug, Clone)]
pub struct SqliteEnvelopeStore {
db: Pool<Sqlite>,
max_disk_size: usize,
}

impl SqliteEnvelopeStore {
pub fn new(db: Pool<Sqlite>, max_disk_size: usize) -> Self {
Self { db, max_disk_size }
/// Initializes the [`SqliteEnvelopeStore`] with a supplied [`Pool`].
pub fn new(db: Pool<Sqlite>) -> Self {
Self { db }
}

/// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing
Expand All @@ -90,11 +91,6 @@ impl SqliteEnvelopeStore {
};

relay_log::info!("buffer file {}", path.to_string_lossy());
relay_log::info!(
"max memory size {}",
config.spool_envelopes_max_memory_size()
);
relay_log::info!("max disk size {}", config.spool_envelopes_max_disk_size());

Self::setup(&path).await?;

Expand Down Expand Up @@ -131,10 +127,7 @@ impl SqliteEnvelopeStore {
.await
.map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;

Ok(SqliteEnvelopeStore {
db,
max_disk_size: config.spool_envelopes_max_disk_size(),
})
Ok(SqliteEnvelopeStore { db })
}

/// Set up the database and return the current number of envelopes.
Expand Down Expand Up @@ -180,6 +173,7 @@ impl SqliteEnvelopeStore {
Ok(())
}

/// Inserts one or more [`InsertEnvelope`] into the database.
pub async fn insert_many(
&self,
envelopes: impl IntoIterator<Item = InsertEnvelope>,
Expand All @@ -200,6 +194,7 @@ impl SqliteEnvelopeStore {
Ok(())
}

/// Deletes and returns at most `limit` [`Envelope`]s from the database.
pub async fn delete_many(
&self,
own_key: ProjectKey,
Expand Down Expand Up @@ -261,9 +256,11 @@ impl SqliteEnvelopeStore {
Ok(extracted_envelopes)
}

/// Returns a set of project key pairs, representing all the unique combinations of
/// `own_key` and `project_key` that are found in the database.
pub async fn project_key_pairs(
&self,
) -> Result<Vec<(ProjectKey, ProjectKey)>, SqliteEnvelopeStoreError> {
) -> Result<HashSet<(ProjectKey, ProjectKey)>, SqliteEnvelopeStoreError> {
let project_key_pairs = build_get_project_key_pairs()
.fetch_all(&self.db)
.await
Expand All @@ -278,6 +275,7 @@ impl SqliteEnvelopeStore {
Ok(project_key_pairs)
}

/// Returns an approximate measure of the size of the database.
pub async fn used_size(&self) -> Result<i64, SqliteEnvelopeStoreError> {
build_estimate_size()
.fetch_one(&self.db)
Expand Down Expand Up @@ -327,7 +325,8 @@ fn extract_project_key_pair(
(Ok(own_key), Ok(sampling_key)) => Ok((own_key, sampling_key)),
// Report the first found error.
(Err(err), _) | (_, Err(err)) => {
relay_log::error!("Failed to extract a queue key from the spool record: {err}");
relay_log::error!("failed to extract a queue key from the spool record: {err}");

Err(err)
}
}
Expand Down Expand Up @@ -444,7 +443,7 @@ mod tests {
#[tokio::test]
async fn test_insert_and_delete_envelopes() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let envelope_store = SqliteEnvelopeStore::new(db);

let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
Expand Down Expand Up @@ -472,7 +471,7 @@ mod tests {
#[tokio::test]
async fn test_insert_and_get_project_keys_pairs() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let envelope_store = SqliteEnvelopeStore::new(db);

let own_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let sampling_key = ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap();
Expand All @@ -488,6 +487,9 @@ mod tests {
// same pair.
let project_key_pairs = envelope_store.project_key_pairs().await.unwrap();
assert_eq!(project_key_pairs.len(), 1);
assert_eq!(project_key_pairs[0], (own_key, sampling_key));
assert_eq!(
project_key_pairs.into_iter().last().unwrap(),
(own_key, sampling_key)
);
}
}

0 comments on commit af36538

Please sign in to comment.