From 990bb4e13ebd8c3c7f80cace736ebed20d5d98e9 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 29 Jul 2024 18:07:43 +0200 Subject: [PATCH] Improve --- relay-server/src/service.rs | 8 ++++---- .../services/buffer/envelope_buffer/mod.rs | 9 +++++---- .../services/buffer/envelope_stack/sqlite.rs | 19 +++++++++++++------ .../services/buffer/envelope_store/sqlite.rs | 6 +++++- relay-server/src/services/buffer/mod.rs | 19 +++++++++---------- relay-server/src/services/project_cache.rs | 12 ++++++------ 6 files changed, 42 insertions(+), 31 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 078aa5f3af..4288347094 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::Duration; use crate::metrics::{MetricOutcomes, MetricStats}; -use crate::services::buffer::EnvelopeBuffer; +use crate::services::buffer::EnvelopesBufferManager; use crate::services::stats::RelayStats; use anyhow::{Context, Result}; use axum::extract::FromRequestParts; @@ -139,7 +139,7 @@ fn create_store_pool(config: &Config) -> Result { struct StateInner { config: Arc, memory_checker: MemoryChecker, - envelope_buffer: Option, + envelope_buffer: Option, registry: Registry, } @@ -257,7 +257,7 @@ impl ServiceState { upstream_relay.clone(), global_config.clone(), ); - let envelope_buffer = EnvelopeBuffer::from_config(&config); + let envelope_buffer = EnvelopesBufferManager::from_config(&config); ProjectCacheService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), @@ -324,7 +324,7 @@ impl ServiceState { &self.inner.memory_checker } - pub fn envelope_buffer(&self) -> Option<&EnvelopeBuffer> { + pub fn envelope_buffer(&self) -> Option<&EnvelopesBufferManager> { self.inner.envelope_buffer.as_ref() } diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index c327e49b16..3652b0b2bf 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -15,17 +15,18 @@ use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::stack_provider::memory::MemoryStackProvider; use crate::SqliteEnvelopeStack; -/// Creates a memory or disk based [`EnvelopeBuffer`], depending on the given config. -pub fn create(_config: Arc) -> Arc>> { +/// Creates a memory or disk based [`EnvelopesBuffer`], depending on the given config. +pub fn create(_config: &Config) -> Arc>> { Arc::new(Mutex::new(InnerEnvelopeBuffer::::new())) } -pub enum EnvelopeBuffer { +#[derive(Debug)] +pub enum EnvelopesBuffer { InMemory(InnerEnvelopeBuffer), Sqlite(InnerEnvelopeBuffer), } -impl EnvelopeBuffer { +impl EnvelopesBuffer { pub fn from_config(config: &Config) -> Self { match config.spool_envelopes_path() { Some(path) => Self::Sqlite(InnerEnvelopeBuffer::::new(path)), diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 262a702f7b..ab7ea5ea4e 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -38,6 +38,7 @@ pub enum SqliteEnvelopeStackError { DatabaseError(#[from] sqlx::Error), } +#[derive(Debug)] /// An [`EnvelopeStack`] that is implemented on an SQLite database. /// /// For efficiency reasons, the implementation has an in-memory buffer that is periodically spooled @@ -396,8 +397,9 @@ mod tests { #[should_panic] async fn test_push_with_mismatching_project_keys() { let db = setup_db(false).await; + let envelope_store = SqliteEnvelopeStore::new(db, 0); let mut stack = SqliteEnvelopeStack::new( - db, + envelope_store, 2, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -411,8 +413,9 @@ mod tests { #[tokio::test] async fn test_push_when_db_is_not_valid() { let db = setup_db(false).await; + let envelope_store = SqliteEnvelopeStore::new(db, 0); let mut stack = SqliteEnvelopeStack::new( - db, + envelope_store, 2, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -462,8 +465,9 @@ mod tests { #[tokio::test] async fn test_pop_when_db_is_not_valid() { let db = setup_db(false).await; + let envelope_store = SqliteEnvelopeStore::new(db, 0); let mut stack = SqliteEnvelopeStack::new( - db, + envelope_store, 2, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -480,8 +484,9 @@ mod tests { #[tokio::test] async fn test_pop_when_stack_is_empty() { let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db, 0); let mut stack = SqliteEnvelopeStack::new( - db, + envelope_store, 2, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -498,8 +503,9 @@ mod tests { #[tokio::test] async fn test_push_below_threshold_and_pop() { let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db, 0); let mut stack = SqliteEnvelopeStack::new( - db, + envelope_store, 5, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), @@ -534,8 +540,9 @@ mod tests { #[tokio::test] async fn test_push_above_threshold_and_pop() { let db = setup_db(true).await; + let envelope_store = SqliteEnvelopeStore::new(db, 0); let mut stack = SqliteEnvelopeStack::new( - db, + envelope_store, 5, 2, ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), diff --git a/relay-server/src/services/buffer/envelope_store/sqlite.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs index ebc15dc4c8..40f9f8a9ba 100644 --- a/relay-server/src/services/buffer/envelope_store/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -58,13 +58,17 @@ pub enum SqliteEnvelopeStoreError { MigrationError(MigrateError), } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct SqliteEnvelopeStore { db: Pool, max_disk_size: usize, } impl SqliteEnvelopeStore { + pub fn new(db: Pool, max_disk_size: usize) -> Self { + Self { db, max_disk_size } + } + /// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing /// the folders where data will be stored. pub async fn prepare( diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index b05da89515..b7f4d53501 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -10,8 +10,7 @@ use relay_base_schema::project::ProjectKey; use relay_config::Config; use crate::envelope::Envelope; -use crate::services::buffer::envelope_buffer::priority::PriorityEnvelopeBuffer; -use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; +use crate::services::buffer::envelope_buffer::EnvelopesBuffer; mod envelope_buffer; mod envelope_stack; @@ -22,7 +21,7 @@ mod stack_provider; /// /// Access to the buffer is synchronized by a tokio lock. #[derive(Debug, Clone)] -pub struct EnvelopeBuffer { +pub struct EnvelopesBufferManager { /// TODO: Reconsider synchronization mechanism. /// We can either /// - keep the interface sync and use a std Mutex. In this case, we create a queue of threads. @@ -34,13 +33,13 @@ pub struct EnvelopeBuffer { /// > The primary use case for the async mutex is to provide shared mutable access to IO resources such as a database connection. /// > [...] when you do want shared access to an IO resource, it is often better to spawn a task to manage the IO resource, /// > and to use message passing to communicate with that task. - backend: Arc>>, + backend: Arc>, notify: Arc, changed: Arc, } -impl EnvelopeBuffer { - /// Creates a memory or disk based [`EnvelopeBuffer`], depending on the given config. +impl EnvelopesBufferManager { + /// Creates a memory or disk based [`EnvelopesBufferManager`], depending on the given config. /// /// NOTE: until the V1 spooler implementation is removed, this function returns `None` /// if V2 spooling is not configured. @@ -102,7 +101,7 @@ impl EnvelopeBuffer { /// /// Objects of this type can only exist if the buffer is not empty. pub struct Peek<'a> { - guard: MutexGuard<'a, PriorityEnvelopeBuffer>, + guard: MutexGuard<'a, EnvelopesBuffer>, notify: &'a tokio::sync::Notify, changed: &'a AtomicBool, } @@ -127,7 +126,7 @@ impl Peek<'_> { .expect("element disappeared while holding lock") } - /// Sync version of [`EnvelopeBuffer::mark_ready`]. + /// Sync version of [`EnvelopesBufferManager::mark_ready`]. /// /// Since [`Peek`] already has exclusive access to the buffer, it can mark projects as ready /// without awaiting the lock. @@ -228,8 +227,8 @@ mod tests { assert_eq!(call_count.load(Ordering::Relaxed), 2); } - fn new_buffer() -> EnvelopeBuffer { - EnvelopeBuffer::from_config( + fn new_buffer() -> EnvelopesBufferManager { + EnvelopesBufferManager::from_config( &Config::from_json_value(serde_json::json!({ "spool": { "envelopes": { diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index ad77752941..c184e975d2 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -5,7 +5,7 @@ use std::time::Duration; use crate::extractors::RequestMeta; use crate::metrics::MetricOutcomes; -use crate::services::buffer::{EnvelopeBuffer, Peek}; +use crate::services::buffer::{EnvelopesBufferManager, Peek}; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::{Config, RelayMode}; @@ -569,7 +569,7 @@ struct ProjectCacheBroker { config: Arc, memory_checker: MemoryChecker, // TODO: Make non-optional when spool_v1 is removed. - envelope_buffer: Option, + envelope_buffer: Option, services: Services, metric_outcomes: MetricOutcomes, // Need hashbrown because extract_if is not stable in std yet. @@ -1265,7 +1265,7 @@ impl ProjectCacheBroker { pub struct ProjectCacheService { config: Arc, memory_checker: MemoryChecker, - envelope_buffer: Option, + envelope_buffer: Option, services: Services, metric_outcomes: MetricOutcomes, redis: Option, @@ -1276,7 +1276,7 @@ impl ProjectCacheService { pub fn new( config: Arc, memory_checker: MemoryChecker, - envelope_buffer: Option, + envelope_buffer: Option, services: Services, metric_outcomes: MetricOutcomes, redis: Option, @@ -1453,7 +1453,7 @@ impl Service for ProjectCacheService { } /// Temporary helper function while V1 spool eixsts. -async fn peek_buffer(buffer: &Option) -> Peek { +async fn peek_buffer(buffer: &Option) -> Peek { match buffer { Some(buffer) => buffer.peek().await, None => std::future::pending().await, @@ -1534,7 +1534,7 @@ mod tests { .unwrap() .into(); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - let envelope_buffer = EnvelopeBuffer::from_config(&config); + let envelope_buffer = EnvelopesBufferManager::from_config(&config); let buffer_services = spooler::Services { outcome_aggregator: services.outcome_aggregator.clone(), project_cache: services.project_cache.clone(),