Skip to content

Commit

Permalink
Improve
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Jul 29, 2024
1 parent 8864e61 commit 990bb4e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 31 deletions.
8 changes: 4 additions & 4 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::Duration;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::EnvelopeBuffer;
use crate::services::buffer::EnvelopesBufferManager;
use crate::services::stats::RelayStats;
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
Expand Down Expand Up @@ -139,7 +139,7 @@ fn create_store_pool(config: &Config) -> Result<ThreadPool> {
struct StateInner {
config: Arc<Config>,
memory_checker: MemoryChecker,
envelope_buffer: Option<EnvelopeBuffer>,
envelope_buffer: Option<EnvelopesBufferManager>,
registry: Registry,
}

Expand Down Expand Up @@ -257,7 +257,7 @@ impl ServiceState {
upstream_relay.clone(),
global_config.clone(),
);
let envelope_buffer = EnvelopeBuffer::from_config(&config);
let envelope_buffer = EnvelopesBufferManager::from_config(&config);
ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Expand Down Expand Up @@ -324,7 +324,7 @@ impl ServiceState {
&self.inner.memory_checker
}

pub fn envelope_buffer(&self) -> Option<&EnvelopeBuffer> {
pub fn envelope_buffer(&self) -> Option<&EnvelopesBufferManager> {
self.inner.envelope_buffer.as_ref()
}

Expand Down
9 changes: 5 additions & 4 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
use crate::SqliteEnvelopeStack;

/// Creates a memory or disk based [`EnvelopeBuffer`], depending on the given config.
pub fn create(_config: Arc<Config>) -> Arc<Mutex<InnerEnvelopeBuffer<MemoryEnvelopeStack>>> {
/// Creates a memory or disk based [`EnvelopesBuffer`], depending on the given config.
pub fn create(_config: &Config) -> Arc<Mutex<InnerEnvelopeBuffer<MemoryEnvelopeStack>>> {
Arc::new(Mutex::new(InnerEnvelopeBuffer::<MemoryEnvelopeStack>::new()))
}

pub enum EnvelopeBuffer {
#[derive(Debug)]
pub enum EnvelopesBuffer {
InMemory(InnerEnvelopeBuffer<MemoryEnvelopeStack>),
Sqlite(InnerEnvelopeBuffer<SqliteEnvelopeStack>),
}

impl EnvelopeBuffer {
impl EnvelopesBuffer {
pub fn from_config(config: &Config) -> Self {
match config.spool_envelopes_path() {
Some(path) => Self::Sqlite(InnerEnvelopeBuffer::<SqliteEnvelopeStack>::new(path)),
Expand Down
19 changes: 13 additions & 6 deletions relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub enum SqliteEnvelopeStackError {
DatabaseError(#[from] sqlx::Error),
}

#[derive(Debug)]
/// An [`EnvelopeStack`] that is implemented on an SQLite database.
///
/// For efficiency reasons, the implementation has an in-memory buffer that is periodically spooled
Expand Down Expand Up @@ -396,8 +397,9 @@ mod tests {
#[should_panic]
async fn test_push_with_mismatching_project_keys() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let mut stack = SqliteEnvelopeStack::new(
db,
envelope_store,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand All @@ -411,8 +413,9 @@ mod tests {
#[tokio::test]
async fn test_push_when_db_is_not_valid() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let mut stack = SqliteEnvelopeStack::new(
db,
envelope_store,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand Down Expand Up @@ -462,8 +465,9 @@ mod tests {
#[tokio::test]
async fn test_pop_when_db_is_not_valid() {
let db = setup_db(false).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let mut stack = SqliteEnvelopeStack::new(
db,
envelope_store,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand All @@ -480,8 +484,9 @@ mod tests {
#[tokio::test]
async fn test_pop_when_stack_is_empty() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let mut stack = SqliteEnvelopeStack::new(
db,
envelope_store,
2,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand All @@ -498,8 +503,9 @@ mod tests {
#[tokio::test]
async fn test_push_below_threshold_and_pop() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let mut stack = SqliteEnvelopeStack::new(
db,
envelope_store,
5,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand Down Expand Up @@ -534,8 +540,9 @@ mod tests {
#[tokio::test]
async fn test_push_above_threshold_and_pop() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, 0);
let mut stack = SqliteEnvelopeStack::new(
db,
envelope_store,
5,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/services/buffer/envelope_store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@ pub enum SqliteEnvelopeStoreError {
MigrationError(MigrateError),
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct SqliteEnvelopeStore {
db: Pool<Sqlite>,
max_disk_size: usize,
}

impl SqliteEnvelopeStore {
pub fn new(db: Pool<Sqlite>, max_disk_size: usize) -> Self {
Self { db, max_disk_size }
}

/// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing
/// the folders where data will be stored.
pub async fn prepare(
Expand Down
19 changes: 9 additions & 10 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use relay_base_schema::project::ProjectKey;
use relay_config::Config;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::priority::PriorityEnvelopeBuffer;
use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack;
use crate::services::buffer::envelope_buffer::EnvelopesBuffer;

mod envelope_buffer;
mod envelope_stack;
Expand All @@ -22,7 +21,7 @@ mod stack_provider;
///
/// Access to the buffer is synchronized by a tokio lock.
#[derive(Debug, Clone)]
pub struct EnvelopeBuffer {
pub struct EnvelopesBufferManager {
/// TODO: Reconsider synchronization mechanism.
/// We can either
/// - keep the interface sync and use a std Mutex. In this case, we create a queue of threads.
Expand All @@ -34,13 +33,13 @@ pub struct EnvelopeBuffer {
/// > The primary use case for the async mutex is to provide shared mutable access to IO resources such as a database connection.
/// > [...] when you do want shared access to an IO resource, it is often better to spawn a task to manage the IO resource,
/// > and to use message passing to communicate with that task.
backend: Arc<tokio::sync::Mutex<PriorityEnvelopeBuffer<MemoryEnvelopeStack>>>,
backend: Arc<tokio::sync::Mutex<EnvelopesBuffer>>,
notify: Arc<tokio::sync::Notify>,
changed: Arc<AtomicBool>,
}

impl EnvelopeBuffer {
/// Creates a memory or disk based [`EnvelopeBuffer`], depending on the given config.
impl EnvelopesBufferManager {
/// Creates a memory or disk based [`EnvelopesBufferManager`], depending on the given config.
///
/// NOTE: until the V1 spooler implementation is removed, this function returns `None`
/// if V2 spooling is not configured.
Expand Down Expand Up @@ -102,7 +101,7 @@ impl EnvelopeBuffer {
///
/// Objects of this type can only exist if the buffer is not empty.
pub struct Peek<'a> {
guard: MutexGuard<'a, PriorityEnvelopeBuffer<MemoryEnvelopeStack>>,
guard: MutexGuard<'a, EnvelopesBuffer>,
notify: &'a tokio::sync::Notify,
changed: &'a AtomicBool,
}
Expand All @@ -127,7 +126,7 @@ impl Peek<'_> {
.expect("element disappeared while holding lock")
}

/// Sync version of [`EnvelopeBuffer::mark_ready`].
/// Sync version of [`EnvelopesBufferManager::mark_ready`].
///
/// Since [`Peek`] already has exclusive access to the buffer, it can mark projects as ready
/// without awaiting the lock.
Expand Down Expand Up @@ -228,8 +227,8 @@ mod tests {
assert_eq!(call_count.load(Ordering::Relaxed), 2);
}

fn new_buffer() -> EnvelopeBuffer {
EnvelopeBuffer::from_config(
fn new_buffer() -> EnvelopesBufferManager {
EnvelopesBufferManager::from_config(
&Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
Expand Down
12 changes: 6 additions & 6 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Duration;

use crate::extractors::RequestMeta;
use crate::metrics::MetricOutcomes;
use crate::services::buffer::{EnvelopeBuffer, Peek};
use crate::services::buffer::{EnvelopesBufferManager, Peek};
use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
use relay_config::{Config, RelayMode};
Expand Down Expand Up @@ -569,7 +569,7 @@ struct ProjectCacheBroker {
config: Arc<Config>,
memory_checker: MemoryChecker,
// TODO: Make non-optional when spool_v1 is removed.
envelope_buffer: Option<EnvelopeBuffer>,
envelope_buffer: Option<EnvelopesBufferManager>,
services: Services,
metric_outcomes: MetricOutcomes,
// Need hashbrown because extract_if is not stable in std yet.
Expand Down Expand Up @@ -1265,7 +1265,7 @@ impl ProjectCacheBroker {
pub struct ProjectCacheService {
config: Arc<Config>,
memory_checker: MemoryChecker,
envelope_buffer: Option<EnvelopeBuffer>,
envelope_buffer: Option<EnvelopesBufferManager>,
services: Services,
metric_outcomes: MetricOutcomes,
redis: Option<RedisPool>,
Expand All @@ -1276,7 +1276,7 @@ impl ProjectCacheService {
pub fn new(
config: Arc<Config>,
memory_checker: MemoryChecker,
envelope_buffer: Option<EnvelopeBuffer>,
envelope_buffer: Option<EnvelopesBufferManager>,
services: Services,
metric_outcomes: MetricOutcomes,
redis: Option<RedisPool>,
Expand Down Expand Up @@ -1453,7 +1453,7 @@ impl Service for ProjectCacheService {
}

/// Temporary helper function while V1 spool eixsts.
async fn peek_buffer(buffer: &Option<EnvelopeBuffer>) -> Peek {
async fn peek_buffer(buffer: &Option<EnvelopesBufferManager>) -> Peek {
match buffer {
Some(buffer) => buffer.peek().await,
None => std::future::pending().await,
Expand Down Expand Up @@ -1534,7 +1534,7 @@ mod tests {
.unwrap()
.into();
let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone());
let envelope_buffer = EnvelopeBuffer::from_config(&config);
let envelope_buffer = EnvelopesBufferManager::from_config(&config);
let buffer_services = spooler::Services {
outcome_aggregator: services.outcome_aggregator.clone(),
project_cache: services.project_cache.clone(),
Expand Down

0 comments on commit 990bb4e

Please sign in to comment.