Skip to content

Commit

Permalink
Improve
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jul 30, 2024
1 parent 8972444 commit a822832
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 71 deletions.
28 changes: 27 additions & 1 deletion relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,33 @@ impl EnvelopesBuffer {
}
}

// TODO: add push, pop, peek
pub async fn push(&mut self, envelope: Box<Envelope>) {
match self {
Self::Sqlite(buffer) => buffer.push(envelope).await,
Self::InMemory(buffer) => buffer.push(envelope).await,
}
}

pub async fn peek(&mut self) -> Option<&Envelope> {
match self {
Self::Sqlite(buffer) => buffer.peek().await,
Self::InMemory(buffer) => buffer.peek().await,
}
}

pub async fn pop(&mut self) -> Option<Box<Envelope>> {
match self {
Self::Sqlite(buffer) => buffer.pop().await,
Self::InMemory(buffer) => buffer.pop().await,
}
}

pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
match self {
Self::Sqlite(buffer) => buffer.mark_ready(project, is_ready),
Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
}
}
}

/// An envelope buffer that holds an individual stack for each project/sampling project combination.
Expand Down
57 changes: 2 additions & 55 deletions relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use std::num::NonZeroUsize;
use std::path::Path;

use futures::StreamExt;
use sqlx::query::Query;
use sqlx::sqlite::{SqliteArguments, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{Pool, QueryBuilder, Row, Sqlite};
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{Pool, Row, Sqlite};
use tokio::fs::DirBuilder;

use relay_base_schema::project::ProjectKey;
Expand Down Expand Up @@ -234,58 +233,6 @@ impl EnvelopeStack for SqliteEnvelopeStack {
}
}

/// Struct which contains all the rows that have to be inserted in the database when storing an
/// [`Envelope`].
struct InsertEnvelope {
received_at: i64,
own_key: ProjectKey,
sampling_key: ProjectKey,
encoded_envelope: Vec<u8>,
}

/// Builds a query that inserts many [`Envelope`]s in the database.
fn build_insert_many_envelopes<'a>(
envelopes: impl Iterator<Item = InsertEnvelope>,
) -> QueryBuilder<'a, Sqlite> {
let mut builder: QueryBuilder<Sqlite> =
QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) ");

builder.push_values(envelopes, |mut b, envelope| {
b.push_bind(envelope.received_at)
.push_bind(envelope.own_key.to_string())
.push_bind(envelope.sampling_key.to_string())
.push_bind(envelope.encoded_envelope);
});

builder
}

/// Builds a query that deletes many [`Envelope`] from the database.
pub fn build_delete_and_fetch_many_envelopes<'a>(
own_key: ProjectKey,
project_key: ProjectKey,
limit: i64,
) -> Query<'a, Sqlite, SqliteArguments<'a>> {
sqlx::query(
"DELETE FROM
envelopes
WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ?
ORDER BY received_at DESC LIMIT ?)
RETURNING
received_at, own_key, sampling_key, envelope",
)
.bind(own_key.to_string())
.bind(project_key.to_string())
.bind(limit)
}

/// Computes the `received_at` timestamps of an [`Envelope`] based on the `start_time` header.
///
/// This method has been copied from the `ManagedEnvelope.received_at()` method.
fn received_at(envelope: &Envelope) -> i64 {
relay_common::time::instant_to_date_time(envelope.meta().start_time()).timestamp_millis()
}

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
Expand Down
23 changes: 9 additions & 14 deletions relay-server/src/services/buffer/sqlite_envelope_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@ use std::error::Error;
use std::iter;
use std::path::Path;
use std::pin::pin;
use std::sync::Arc;

use futures::stream::StreamExt;
use sqlx::migrate::MigrateError;
use sqlx::query::Query;
use sqlx::sqlite::{
SqliteArguments, SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions,
SqliteRow, SqliteSynchronous,
};
use sqlx::{Pool, QueryBuilder, Sqlite};
use sqlx::{Pool, QueryBuilder, Row, Sqlite};
use tokio::fs::DirBuilder;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;

use crate::extractors::StartTime;
use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
use crate::services::buffer::envelope_store::EnvelopeStore;
use crate::Envelope;

struct InsertEnvelope {
Expand Down Expand Up @@ -171,10 +170,10 @@ impl SqliteEnvelopeStore {
Ok(())
}

async fn insert_many(
pub async fn insert_many(
&mut self,
envelopes: impl Iterator<Item = InsertEnvelope>,
) -> Result<(), Self::Error> {
) -> Result<(), SqliteEnvelopeStoreError> {
if let Err(err) = build_insert_many_envelopes(envelopes)
.build()
.execute(&self.db)
Expand All @@ -191,7 +190,7 @@ impl SqliteEnvelopeStore {
Ok(())
}

async fn delete_many(
pub async fn delete_many(
&mut self,
own_key: ProjectKey,
sampling_key: ProjectKey,
Expand All @@ -206,11 +205,7 @@ impl SqliteEnvelopeStore {
return Ok(vec![]);
}

// We use a sorted vector to order envelopes that are deleted from the database.
// Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't
// return deleted rows in a specific order.
let mut extracted_envelopes = Vec::with_capacity(limit as usize);
let mut db_error = None;
while let Some(envelope) = envelopes.as_mut().next().await {
let envelope = match envelope {
Ok(envelope) => envelope,
Expand All @@ -219,8 +214,6 @@ impl SqliteEnvelopeStore {
error = &err as &dyn Error,
"failed to unspool the envelopes from the disk",
);
db_error = Some(err);

continue;
}
};
Expand All @@ -239,19 +232,21 @@ impl SqliteEnvelopeStore {
}

// We sort envelopes by `received_at`.
// Unfortunately we have to do this because SQLite `DELETE` with `RETURNING` doesn't
// return deleted rows in a specific order.
extracted_envelopes.sort_by_key(|a| a.received_at());

Ok(extracted_envelopes)
}

async fn project_keys_pairs(
pub async fn project_keys_pairs(
&self,
) -> Result<impl Iterator<Item = (String, String)>, SqliteEnvelopeStoreError> {
// TODO: implement.
Ok(iter::empty())
}

async fn used_size(&self) -> Result<i64, SqliteEnvelopeStoreError> {
pub async fn used_size(&self) -> Result<i64, SqliteEnvelopeStoreError> {
// TODO: implement.
Ok(10)
}
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/buffer/stack_provider/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use relay_config::Config;

use crate::services::buffer::envelope_stack::StackProvider;
use crate::services::buffer::envelope_store::sqlite::{
use crate::services::buffer::sqlite_envelope_store::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::{Envelope, SqliteEnvelopeStack};
Expand Down

0 comments on commit a822832

Please sign in to comment.