Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Oct 30, 2024
1 parent 23f3bde commit 253ee99
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 21 deletions.
7 changes: 3 additions & 4 deletions relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::time::Instant;
use crate::envelope::Envelope;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::{
InsertEnvelope, InsertEnvelopeError, SqliteEnvelopeStore, SqliteEnvelopeStoreError,
DatabaseEnvelope, InsertEnvelopeError, SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::statsd::{RelayCounters, RelayTimers};

Expand Down Expand Up @@ -38,7 +38,7 @@ pub struct SqliteEnvelopeStack {
sampling_key: ProjectKey,
/// In-memory stack containing a batch of envelopes that either have not been written to disk yet, or have been read from disk recently.
#[allow(clippy::vec_box)]
batch: Vec<InsertEnvelope>,
batch: Vec<DatabaseEnvelope>,
/// Boolean representing whether calls to `push()` and `peek()` check disk in case not enough
/// elements are available in the `batches_buffer`.
check_disk: bool,
Expand Down Expand Up @@ -151,7 +151,7 @@ impl EnvelopeStack for SqliteEnvelopeStack {

let encoded_envelope =
relay_statsd::metric!(timer(RelayTimers::BufferEnvelopesSerialization), {
InsertEnvelope::try_from(envelope.as_ref())?
DatabaseEnvelope::try_from(envelope.as_ref())?
});
self.batch.push(encoded_envelope);

Expand All @@ -174,7 +174,6 @@ impl EnvelopeStack for SqliteEnvelopeStack {
self.unspool_from_disk().await?
}

// FIXME: inefficient peek
let Some(envelope) = self.batch.pop() else {
return Ok(None);
};
Expand Down
37 changes: 22 additions & 15 deletions relay-server/src/services/buffer/envelope_store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ use sqlx::{Pool, QueryBuilder, Row, Sqlite};
use tokio::fs::DirBuilder;
use tokio::time::sleep;

/// Fixed first 4 bytes for zstd compressed envelopes.
///
/// Used for backward compatibility to check whether an envelope on disk is zstd-encoded.
const ZSTD_MAGIC_WORD: &[u8] = &[40, 181, 47, 253];

/// Struct that contains all the fields of an [`Envelope`] that are mapped to the database columns.
#[derive(Clone, Debug)]
pub struct InsertEnvelope {
pub struct DatabaseEnvelope {
received_at: i64,
own_key: ProjectKey,
sampling_key: ProjectKey,
Expand All @@ -44,8 +47,10 @@ pub enum InsertEnvelopeError {
Zstd(#[from] std::io::Error),
}

impl InsertEnvelope {
// TODO: explain why 1
impl DatabaseEnvelope {
// Use the lowest level of compression.
//
// Experiments showed no big difference between 1, 2, and 3 in either run time or compression ratio.
const COMPRESSION_LEVEL: i32 = 1;

pub fn len(&self) -> usize {
Expand All @@ -57,19 +62,21 @@ impl InsertEnvelope {
}
}

impl TryFrom<InsertEnvelope> for Box<Envelope> {
impl TryFrom<DatabaseEnvelope> for Box<Envelope> {
type Error = InsertEnvelopeError;

fn try_from(value: InsertEnvelope) -> Result<Self, Self::Error> {
let InsertEnvelope {
fn try_from(value: DatabaseEnvelope) -> Result<Self, Self::Error> {
let DatabaseEnvelope {
received_at,
own_key,
sampling_key,
mut encoded_envelope,
} = value;

if encoded_envelope.starts_with(ZSTD_MAGIC_WORD) {
encoded_envelope = zstd::decode_all(encoded_envelope.as_slice())?;
relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeDecompression), {
encoded_envelope = zstd::decode_all(encoded_envelope.as_slice())?;
});
}

let mut envelope = Envelope::parse_bytes(Bytes::from(encoded_envelope))?;
Expand All @@ -84,7 +91,7 @@ impl TryFrom<InsertEnvelope> for Box<Envelope> {
}
}

impl<'a> TryFrom<&'a Envelope> for InsertEnvelope {
impl<'a> TryFrom<&'a Envelope> for DatabaseEnvelope {
type Error = InsertEnvelopeError;

fn try_from(value: &'a Envelope) -> Result<Self, Self::Error> {
Expand All @@ -97,7 +104,7 @@ impl<'a> TryFrom<&'a Envelope> for InsertEnvelope {
relay_statsd::metric!(timer(RelayTimers::BufferEnvelopeCompression), {
zstd::encode_all(serialized_envelope.as_slice(), Self::COMPRESSION_LEVEL)?
});
Ok(InsertEnvelope {
Ok(DatabaseEnvelope {
received_at: value.received_at().timestamp_millis(),
own_key,
sampling_key,
Expand Down Expand Up @@ -341,7 +348,7 @@ impl SqliteEnvelopeStore {
/// Inserts one or more envelopes into the database.
pub async fn insert_many(
&mut self,
envelopes: impl IntoIterator<Item = InsertEnvelope>,
envelopes: impl IntoIterator<Item = DatabaseEnvelope>,
) -> Result<(), SqliteEnvelopeStoreError> {
if let Err(err) = build_insert_many_envelopes(envelopes.into_iter())
.build()
Expand All @@ -365,7 +372,7 @@ impl SqliteEnvelopeStore {
own_key: ProjectKey,
sampling_key: ProjectKey,
limit: i64,
) -> Result<Vec<InsertEnvelope>, SqliteEnvelopeStoreError> {
) -> Result<Vec<DatabaseEnvelope>, SqliteEnvelopeStoreError> {
let envelopes = build_delete_and_fetch_many_envelopes(own_key, sampling_key, limit)
.fetch(&self.db)
.peekable();
Expand Down Expand Up @@ -458,12 +465,12 @@ impl SqliteEnvelopeStore {
}
}

/// Deserializes an [`Envelope`] from a database row.
/// Deserializes an [`EncodedEnvelope`] from a database row.
fn extract_envelope(
own_key: ProjectKey,
sampling_key: ProjectKey,
row: SqliteRow,
) -> Result<InsertEnvelope, SqliteEnvelopeStoreError> {
) -> Result<DatabaseEnvelope, SqliteEnvelopeStoreError> {
let encoded_envelope: Vec<u8> = row
.try_get("envelope")
.map_err(SqliteEnvelopeStoreError::FetchError)?;
Expand All @@ -472,7 +479,7 @@ fn extract_envelope(
.try_get("received_at")
.map_err(SqliteEnvelopeStoreError::FetchError)?;

Ok(InsertEnvelope {
Ok(DatabaseEnvelope {
received_at,
own_key,
sampling_key,
Expand Down Expand Up @@ -508,7 +515,7 @@ fn extract_project_key_pair(row: SqliteRow) -> Result<ProjectKeyPair, SqliteEnve

/// Builds a query that inserts many [`Envelope`]s in the database.
fn build_insert_many_envelopes<'a>(
envelopes: impl Iterator<Item = InsertEnvelope>,
envelopes: impl Iterator<Item = DatabaseEnvelope>,
) -> QueryBuilder<'a, Sqlite> {
let mut builder: QueryBuilder<Sqlite> =
QueryBuilder::new("INSERT INTO envelopes (received_at, own_key, sampling_key, envelope) ");
Expand Down
7 changes: 5 additions & 2 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,12 @@ pub enum RelayTimers {
BufferPop,
/// Timing in milliseconds for the time it takes for the buffer to drain its envelopes.
BufferDrain,
/// Timing in milliseconds for the time it takes for the envelopes to be serialized.
/// Timing in milliseconds for the time it takes for an envelope to be serialized.
BufferEnvelopesSerialization,
/// Timing in milliseconds for the time it takes for the envelopes to be compressed.
/// Timing in milliseconds for the time it takes for an envelope to be compressed.
BufferEnvelopeCompression,
/// Timing in milliseconds for the time it takes for an envelope to be decompressed.
BufferEnvelopeDecompression,
}

impl TimerMetric for RelayTimers {
Expand Down Expand Up @@ -598,6 +600,7 @@ impl TimerMetric for RelayTimers {
RelayTimers::BufferDrain => "buffer.drain.duration",
RelayTimers::BufferEnvelopesSerialization => "buffer.envelopes_serialization",
RelayTimers::BufferEnvelopeCompression => "buffer.envelopes_compression",
RelayTimers::BufferEnvelopeDecompression => "buffer.envelopes_decompression",
}
}
}
Expand Down

0 comments on commit 253ee99

Please sign in to comment.