Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Aug 13, 2024
1 parent 35f6433 commit 803b28a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 19 deletions.
30 changes: 22 additions & 8 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ where
}

pub fn has_capacity(&self) -> bool {
matches!(self.stack_provider.capacity(), Capacity::FREE);
matches!(self.stack_provider.capacity(), Capacity::FREE)
}

fn pop_stack(&mut self, stack_key: StackKey) {
Expand Down Expand Up @@ -407,16 +407,16 @@ impl Readiness {
#[cfg(test)]
mod tests {
use std::str::FromStr;

use std::sync::Arc;
use uuid::Uuid;

use relay_common::Dsn;
use relay_sampling::DynamicSamplingContext;

use super::*;
use crate::envelope::{Item, ItemType};
use crate::extractors::RequestMeta;

use super::*;
use crate::utils::MemoryStat;

fn new_envelope(project_key: ProjectKey, sampling_key: Option<ProjectKey>) -> Box<Envelope> {
let mut envelope = Envelope::from_request(
Expand All @@ -441,9 +441,23 @@ mod tests {
envelope
}

fn mock_memory_checker() -> MemoryChecker {
let config: Arc<_> = Config::from_json_value(serde_json::json!({
"spool": {
"health": {
"max_memory_percent": 1.0
}
}
}))
.unwrap()
.into();

MemoryChecker::new(MemoryStat::default(), config.clone())
}

#[tokio::test]
async fn insert_pop() {
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new(mock_memory_checker());

let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
Expand Down Expand Up @@ -518,7 +532,7 @@ mod tests {

#[tokio::test]
async fn project_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new(mock_memory_checker());

let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

Expand All @@ -545,7 +559,7 @@ mod tests {

#[tokio::test]
async fn sampling_projects() {
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new(mock_memory_checker());

let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
Expand Down Expand Up @@ -623,7 +637,7 @@ mod tests {

assert_ne!(stack_key1, stack_key2);

let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new(mock_memory_checker());
buffer
.push(new_envelope(project_key1, Some(project_key2)))
.await
Expand Down
10 changes: 9 additions & 1 deletion relay-server/src/services/buffer/envelope_store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl EnvelopeStore for SqliteEnvelopeStore {
.is_err()
{
relay_log::error!("failed to update the disk usage");
}
};
});

self.last_known_usage.load(Ordering::Relaxed)
Expand Down Expand Up @@ -558,4 +558,12 @@ mod tests {
(own_key, sampling_key)
);
}

#[tokio::test]
async fn test_estimate_disk_usage() {
let db = setup_db(true).await;
let mut envelope_store = SqliteEnvelopeStore::new(db);

println!("{:?}", envelope_store.usage());
}
}
21 changes: 13 additions & 8 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,23 +215,28 @@ mod tests {

use relay_common::Dsn;

use crate::extractors::RequestMeta;

use super::*;
use crate::extractors::RequestMeta;
use crate::utils::MemoryStat;

fn new_buffer() -> Arc<GuardedEnvelopeBuffer> {
GuardedEnvelopeBuffer::from_config(
&Config::from_json_value(serde_json::json!({
let config = Arc::new(
Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"version": "experimental"
"version": "experimental",
"max_memory_percent": 1.0
}
}
}))
.unwrap(),
)
.unwrap()
.into()
);

let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());

GuardedEnvelopeBuffer::from_config(&config, memory_checker)
.unwrap()
.into()
}

fn new_envelope() -> Box<Envelope> {
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/services/buffer/stacks_manager/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use relay_config::Config;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::services::buffer::envelope_store::EnvelopeStore;
use crate::services::buffer::stacks_manager::{Capacity, StacksManager};
use crate::{Envelope, SqliteEnvelopeStack};

Expand Down Expand Up @@ -48,5 +47,6 @@ impl StacksManager for SqliteStacksManager {
fn capacity(&self) -> Capacity {
// TODO: how to we make the check async or sync.
// self.envelope_store.usage()
Capacity::FREE
}
}
3 changes: 2 additions & 1 deletion relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,8 @@ mod tests {
.unwrap()
.into();
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
let envelope_buffer = GuardedEnvelopeBuffer::from_config(&config).map(Arc::new);
let envelope_buffer =
GuardedEnvelopeBuffer::from_config(&config, memory_checker.clone()).map(Arc::new);
let buffer_services = spooler::Services {
outcome_aggregator: services.outcome_aggregator.clone(),
project_cache: services.project_cache.clone(),
Expand Down

0 comments on commit 803b28a

Please sign in to comment.