Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Aug 13, 2024
1 parent 803b28a commit 748c6ca
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 12 deletions.
21 changes: 18 additions & 3 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use relay_config::Config;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use sqlx::{Pool, Sqlite};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tempfile::TempDir;
use tokio::runtime::Runtime;

use relay_base_schema::project::ProjectKey;
use relay_server::{
Envelope, EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
Envelope, EnvelopeStack, MemoryChecker, MemoryStat, PolymorphicEnvelopeBuffer,
SqliteEnvelopeStack, SqliteEnvelopeStore,
};

fn setup_db(path: &PathBuf) -> Pool<Sqlite> {
Expand Down Expand Up @@ -221,6 +223,17 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
let num_projects = 100000;
let envelopes_per_project = 10;

let config: Arc<Config> = Config::from_json_value(serde_json::json!({
"spool": {
"health": {
"max_memory_percent": 1.0
}
}
}))
.unwrap()
.into();
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());

group.throughput(Throughput::Elements(
num_projects * envelopes_per_project as u64,
));
Expand All @@ -245,7 +258,8 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
},
|envelopes| {
runtime.block_on(async {
let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default());
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone());
for envelope in envelopes.into_iter() {
buffer.push(envelope).await.unwrap();
}
Expand Down Expand Up @@ -274,7 +288,8 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
},
|envelopes| {
runtime.block_on(async {
let mut buffer = PolymorphicEnvelopeBuffer::from_config(&Config::default());
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone());
let n = envelopes.len();
for envelope in envelopes.into_iter() {
let public_key = envelope.meta().public_key();
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ pub use self::services::buffer::{
EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
}; // pub for benchmarks
pub use self::services::spooler::spool_utils;
pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks

#[cfg(test)]
mod testutils;
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ where
}

pub fn has_capacity(&self) -> bool {
matches!(self.stack_provider.capacity(), Capacity::FREE)
matches!(self.stack_provider.capacity(), Capacity::Free)
}

fn pop_stack(&mut self, stack_key: StackKey) {
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/services/buffer/envelope_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub trait EnvelopeStore {
limit: i64,
) -> impl Future<Output = Result<Vec<Box<Envelope>>, Self::Error>>;

#[allow(dead_code)]
/// Returns a set of project key pairs, representing all the unique combinations of
/// `own_key` and `project_key` that are found in the store.
fn project_key_pairs(
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/buffer/stacks_manager/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ impl StacksManager for MemoryStacksManager {

fn capacity(&self) -> Capacity {
if self.memory_checker.check_memory().has_capacity() {
Capacity::FREE
Capacity::Free
} else {
Capacity::FULL
Capacity::Full
}
}
}
4 changes: 2 additions & 2 deletions relay-server/src/services/buffer/stacks_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ pub mod sqlite;

/// Enum representing the current capacity of the [`StacksManager`] to accept new [`Envelope`]s.
pub enum Capacity {
FREE,
FULL,
Free,
Full,
}

/// A provider of [`EnvelopeStack`] instances that is responsible for creating them.
Expand Down
9 changes: 6 additions & 3 deletions relay-server/src/services/buffer/stacks_manager/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use relay_config::Config;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::services::buffer::envelope_store::EnvelopeStore;
use crate::services::buffer::stacks_manager::{Capacity, StacksManager};
use crate::{Envelope, SqliteEnvelopeStack};

Expand Down Expand Up @@ -45,8 +46,10 @@ impl StacksManager for SqliteStacksManager {
}

fn capacity(&self) -> Capacity {
// TODO: how to we make the check async or sync.
// self.envelope_store.usage()
Capacity::FREE
if self.envelope_store.usage() < self.max_disk_size {
Capacity::Free
} else {
Capacity::Full
}
}
}
2 changes: 1 addition & 1 deletion relay-server/src/utils/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl MemoryCheck {
/// decides how memory readings are interpreted.
#[derive(Clone, Debug)]
pub struct MemoryChecker {
pub memory_stat: MemoryStat,
memory_stat: MemoryStat,
config: Arc<Config>,
}

Expand Down

0 comments on commit 748c6ca

Please sign in to comment.