Skip to content

Commit

Permalink
feat(spooler): Add EnvelopeStore trait and capacity check
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Aug 13, 2024
1 parent 6d01e72 commit 35f6433
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 94 deletions.
2 changes: 1 addition & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ pub struct EnvelopeSpool {
min_connections: u32,
/// The maximum size of the buffer to keep, in bytes.
///
/// If not set the befault is 524288000 bytes (500MB).
/// If not set the default is 524288000 bytes (500MB).
#[serde(default = "spool_envelopes_max_disk_size")]
max_disk_size: ByteSize,
/// The maximum bytes to keep in the memory buffer before spooling envelopes to disk, in bytes.
Expand Down
13 changes: 4 additions & 9 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,14 @@ fn queue_envelope(

match state.envelope_buffer() {
Some(buffer) => {
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
}

// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

// TODO: Sync-check whether the buffer has capacity.
// Otherwise return `QueueFailed`.
buffer.defer_push(envelope);
}
None => {
Expand Down Expand Up @@ -347,13 +349,6 @@ pub async fn handle_envelope(
)
}

// TODO(jjbayer): Move this check to spool impl
if state.memory_checker().check_memory().is_exceeded() {
// NOTE: Long-term, we should not reject the envelope here, but spool it to disk instead.
// This will be fixed with the new spool implementation.
return Err(BadStoreRequest::QueueFailed);
};

let mut managed_envelope = ManagedEnvelope::new(
envelope,
state.outcome_aggregator().clone(),
Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ impl ServiceState {
upstream_relay.clone(),
global_config.clone(),
);
let envelope_buffer = GuardedEnvelopeBuffer::from_config(&config).map(Arc::new);
let envelope_buffer = GuardedEnvelopeBuffer::from_config(
&config,
MemoryChecker::new(memory_stat.clone(), config.clone()),
)
.map(Arc::new);
ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Expand Down
62 changes: 40 additions & 22 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use relay_config::Config;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
use crate::services::buffer::envelope_stack::{EnvelopeStack, StackProvider};
use crate::services::buffer::sqlite_envelope_store::SqliteEnvelopeStoreError;
use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
use crate::services::buffer::stacks_manager::memory::MemoryStacksManager;
use crate::services::buffer::stacks_manager::sqlite::SqliteStacksManager;
use crate::services::buffer::stacks_manager::{Capacity, StacksManager};
use crate::statsd::{RelayCounters, RelayGauges};
use crate::utils::MemoryChecker;

/// Polymorphic envelope buffering interface.
///
Expand All @@ -25,20 +27,21 @@ use crate::statsd::{RelayCounters, RelayGauges};
#[allow(private_interfaces)]
pub enum PolymorphicEnvelopeBuffer {
/// An enveloper buffer that uses in-memory envelopes stacks.
InMemory(EnvelopeBuffer<MemoryStackProvider>),
InMemory(EnvelopeBuffer<MemoryStacksManager>),
/// An enveloper buffer that uses sqlite envelopes stacks.
#[allow(dead_code)]
Sqlite(EnvelopeBuffer<SqliteStackProvider>),
Sqlite(EnvelopeBuffer<SqliteStacksManager>),
}

impl PolymorphicEnvelopeBuffer {
/// Creates either a memory-based or a disk-based envelope buffer,
/// depending on the given configuration.
pub fn from_config(config: &Config) -> Self {
pub fn from_config(config: &Config, memory_checker: MemoryChecker) -> Self {
if config.spool_envelopes_path().is_some() {
panic!("Disk backend not yet supported for spool V2");
}
Self::InMemory(EnvelopeBuffer::<MemoryStackProvider>::new())

Self::InMemory(EnvelopeBuffer::<MemoryStacksManager>::new(memory_checker))
}

/// Adds an envelope to the buffer.
Expand Down Expand Up @@ -78,6 +81,14 @@ impl PolymorphicEnvelopeBuffer {
Self::InMemory(buffer) => buffer.mark_ready(project, is_ready),
}
}

/// Returns `true` whether the buffer has capacity to accept new [`Envelope`]s.
pub fn has_capacity(&self) -> bool {
match self {
Self::Sqlite(buffer) => buffer.has_capacity(),
Self::InMemory(buffer) => buffer.has_capacity(),
}
}
}

/// Error that occurs while interacting with the envelope buffer.
Expand All @@ -86,6 +97,9 @@ pub enum EnvelopeBufferError {
#[error("sqlite")]
Sqlite(#[from] SqliteEnvelopeStackError),

#[error("failed to push envelope to the buffer")]
PushFailed,

#[error("impossible")]
Impossible(#[from] Infallible),
}
Expand All @@ -95,7 +109,7 @@ pub enum EnvelopeBufferError {
/// Envelope stacks are organized in a priority queue, and are reprioritized every time an envelope
/// is pushed, popped, or when a project becomes ready.
#[derive(Debug)]
struct EnvelopeBuffer<P: StackProvider> {
struct EnvelopeBuffer<P: StacksManager> {
/// The central priority queue.
priority_queue: priority_queue::PriorityQueue<QueueItem<StackKey, P::Stack>, Priority>,
/// A lookup table to find all stacks involving a project.
Expand All @@ -107,32 +121,32 @@ struct EnvelopeBuffer<P: StackProvider> {
stack_provider: P,
}

impl EnvelopeBuffer<MemoryStackProvider> {
/// Creates an empty buffer.
pub fn new() -> Self {
impl EnvelopeBuffer<MemoryStacksManager> {
/// Creates an empty memory-based buffer.
pub fn new(memory_checker: MemoryChecker) -> Self {
Self {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: MemoryStackProvider,
stack_provider: MemoryStacksManager::new(memory_checker),
}
}
}

#[allow(dead_code)]
impl EnvelopeBuffer<SqliteStackProvider> {
/// Creates an empty buffer.
impl EnvelopeBuffer<SqliteStacksManager> {
/// Creates an empty sqlite-based buffer.
pub async fn new(config: &Config) -> Result<Self, SqliteEnvelopeStoreError> {
Ok(Self {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: SqliteStackProvider::new(config).await?,
stack_provider: SqliteStacksManager::new(config).await?,
})
}
}

impl<P: StackProvider> EnvelopeBuffer<P>
impl<P: StacksManager> EnvelopeBuffer<P>
where
EnvelopeBufferError: std::convert::From<<P::Stack as EnvelopeStack>::Error>,
EnvelopeBufferError: From<<P::Stack as EnvelopeStack>::Error>,
{
/// Pushes an envelope to the appropriate envelope stack and reprioritizes the stack.
///
Expand Down Expand Up @@ -255,6 +269,10 @@ where
);
}

pub fn has_capacity(&self) -> bool {
matches!(self.stack_provider.capacity(), Capacity::FREE);
}

fn pop_stack(&mut self, stack_key: StackKey) {
for project_key in stack_key.iter() {
self.stacks_by_project
Expand Down Expand Up @@ -425,7 +443,7 @@ mod tests {

#[tokio::test]
async fn insert_pop() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new();

let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
Expand Down Expand Up @@ -500,7 +518,7 @@ mod tests {

#[tokio::test]
async fn project_internal_order() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new();

let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

Expand All @@ -527,7 +545,7 @@ mod tests {

#[tokio::test]
async fn sampling_projects() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new();

let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
Expand Down Expand Up @@ -605,7 +623,7 @@ mod tests {

assert_ne!(stack_key1, stack_key2);

let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
let mut buffer = EnvelopeBuffer::<MemoryStacksManager>::new();
buffer
.push(new_envelope(project_key1, Some(project_key2)))
.await
Expand Down
6 changes: 0 additions & 6 deletions relay-server/src/services/buffer/envelope_stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,3 @@ pub trait EnvelopeStack: Send + std::fmt::Debug {
/// Pops the [`Envelope`] on top of the stack.
fn pop(&mut self) -> impl Future<Output = Result<Option<Box<Envelope>>, Self::Error>>;
}

pub trait StackProvider: std::fmt::Debug {
type Stack: EnvelopeStack;

fn create_stack(&self, envelope: Box<Envelope>) -> Self::Stack;
}
3 changes: 2 additions & 1 deletion relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use relay_base_schema::project::ProjectKey;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::sqlite_envelope_store::{
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::services::buffer::envelope_store::EnvelopeStore;
use crate::statsd::RelayCounters;

/// An error returned when doing an operation on [`SqliteEnvelopeStack`].
Expand Down
39 changes: 39 additions & 0 deletions relay-server/src/services/buffer/envelope_store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::future::Future;

use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;

use crate::Envelope;

pub mod sqlite;

/// Trait that models a store of [`Envelope`]s.
pub trait EnvelopeStore {
type Envelope;

type Error;

/// Inserts one or more envelopes into the store.
fn insert_many(
&mut self,
envelopes: impl IntoIterator<Item = Self::Envelope>,
) -> impl Future<Output = Result<(), Self::Error>>;

/// Deletes one or more envelopes that match `own_key` and `sampling_key` up to `limit` from
/// the store.
fn delete_many(
&mut self,
own_key: ProjectKey,
sampling_key: ProjectKey,
limit: i64,
) -> impl Future<Output = Result<Vec<Box<Envelope>>, Self::Error>>;

/// Returns a set of project key pairs, representing all the unique combinations of
/// `own_key` and `project_key` that are found in the store.
fn project_key_pairs(
&self,
) -> impl Future<Output = Result<HashSet<(ProjectKey, ProjectKey)>, Self::Error>>;

/// Returns the usage of the store where the definition of usage depends on the implementation.
fn usage(&self) -> usize;
}
Loading

0 comments on commit 35f6433

Please sign in to comment.