diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 89679c8d9b..16fdc5d416 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -259,7 +259,7 @@ pub fn event_id_from_items(items: &Items) -> Result, BadStoreReq /// /// Queueing can fail if the queue exceeds `envelope_buffer_size`. In this case, `Err` is /// returned and the envelope is not queued. -async fn queue_envelope( +fn queue_envelope( state: &ServiceState, mut managed_envelope: ManagedEnvelope, ) -> Result<(), BadStoreRequest> { @@ -307,7 +307,7 @@ async fn queue_envelope( match state.envelope_buffer() { Some(buffer) => { - if !buffer.has_capacity().await { + if !buffer.has_capacity() { return Err(BadStoreRequest::QueueFailed); } @@ -388,7 +388,7 @@ pub async fn handle_envelope( return Err(BadStoreRequest::Overflow(offender)); } - queue_envelope(state, managed_envelope).await?; + queue_envelope(state, managed_envelope)?; if checked.rate_limits.is_limited() { // Even if some envelope items have been queued, there might be active rate limits on diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index a7274121be..2117ae3d53 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -9,7 +9,6 @@ use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; -use crate::services::buffer::envelope_store::EnvelopeStore; use crate::statsd::RelayCounters; /// An error returned when doing an operation on [`SqliteEnvelopeStack`]. diff --git a/relay-server/src/services/buffer/envelope_store/mod.rs b/relay-server/src/services/buffer/envelope_store/mod.rs index 12c1619cd1..6b1c1083e3 100644 --- a/relay-server/src/services/buffer/envelope_store/mod.rs +++ b/relay-server/src/services/buffer/envelope_store/mod.rs @@ -1,42 +1 @@ -use std::future::Future; - -use hashbrown::HashSet; -use relay_base_schema::project::ProjectKey; - -use crate::Envelope; - pub mod sqlite; - -/// Trait that models a store of [`Envelope`]s. -pub trait EnvelopeStore { - /// The type that is inserted in the store. - type Envelope; - - /// The error type that is returned when an error occurs in the store. - type Error; - - /// Inserts one or more envelopes into the store. - fn insert_many( - &mut self, - envelopes: impl IntoIterator, - ) -> impl Future>; - - /// Deletes one or more envelopes that match `own_key` and `sampling_key` up to `limit` from - /// the store. - fn delete_many( - &mut self, - own_key: ProjectKey, - sampling_key: ProjectKey, - limit: i64, - ) -> impl Future>, Self::Error>>; - - /// Returns a set of project key pairs, representing all the unique combinations of - /// `own_key` and `project_key` that are found in the store. - #[allow(dead_code)] - fn project_key_pairs( - &self, - ) -> impl Future, Self::Error>>; - - /// Returns the usage of the store where the definition of usage depends on the implementation. - fn usage(&self) -> u64; -} diff --git a/relay-server/src/services/buffer/envelope_store/sqlite.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs index 551c028a0e..3969d54075 100644 --- a/relay-server/src/services/buffer/envelope_store/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -7,7 +7,6 @@ use std::time::Duration; use crate::envelope::EnvelopeError; use crate::extractors::StartTime; -use crate::services::buffer::envelope_store::EnvelopeStore; use crate::statsd::RelayGauges; use crate::Envelope; use futures::stream::StreamExt; @@ -294,18 +293,12 @@ impl SqliteEnvelopeStore { Ok(()) } -} - -impl EnvelopeStore for SqliteEnvelopeStore { - type Envelope = InsertEnvelope; - - type Error = SqliteEnvelopeStoreError; /// Inserts one or more envelopes into the database. - async fn insert_many( + pub async fn insert_many( &mut self, - envelopes: impl IntoIterator, - ) -> Result<(), Self::Error> { + envelopes: impl IntoIterator, + ) -> Result<(), SqliteEnvelopeStoreError> { if let Err(err) = build_insert_many_envelopes(envelopes.into_iter()) .build() .execute(&self.db) @@ -323,12 +316,12 @@ impl EnvelopeStore for SqliteEnvelopeStore { } /// Deletes and returns at most `limit` [`Envelope`]s from the database. - async fn delete_many( + pub async fn delete_many( &mut self, own_key: ProjectKey, sampling_key: ProjectKey, limit: i64, - ) -> Result>, Self::Error> { + ) -> Result>, SqliteEnvelopeStoreError> { let envelopes = build_delete_and_fetch_many_envelopes(own_key, sampling_key, limit) .fetch(&self.db) .peekable(); @@ -386,7 +379,9 @@ impl EnvelopeStore for SqliteEnvelopeStore { /// Returns a set of project key pairs, representing all the unique combinations of /// `own_key` and `project_key` that are found in the database. - async fn project_key_pairs(&self) -> Result, Self::Error> { + pub async fn project_key_pairs( + &self, + ) -> Result, SqliteEnvelopeStoreError> { let project_key_pairs = build_get_project_key_pairs() .fetch_all(&self.db) .await @@ -402,7 +397,7 @@ impl EnvelopeStore for SqliteEnvelopeStore { } /// Returns an approximate measure of the used size of the database. - fn usage(&self) -> u64 { + pub fn usage(&self) -> u64 { self.disk_usage.usage() } } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 416945cf5b..2d54195bcc 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -1,6 +1,6 @@ //! Types for buffering envelopes. -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use relay_base_schema::project::ProjectKey; @@ -10,11 +10,13 @@ use tokio::sync::MutexGuard; use crate::envelope::Envelope; use crate::utils::{ManagedEnvelope, MemoryChecker}; +use crate::statsd::RelayCounters; pub use envelope_buffer::EnvelopeBufferError; pub use envelope_buffer::PolymorphicEnvelopeBuffer; pub use envelope_stack::sqlite::SqliteEnvelopeStack; // pub for benchmarks pub use envelope_stack::EnvelopeStack; // pub for benchmarks -pub use envelope_store::sqlite::SqliteEnvelopeStore; // pub for benchmarks // pub for benchmarks +pub use envelope_store::sqlite::SqliteEnvelopeStore; +// pub for benchmarks // pub for benchmarks mod envelope_buffer; mod envelope_stack; @@ -53,6 +55,8 @@ pub struct GuardedEnvelopeBuffer { notify: tokio::sync::Notify, /// Metric that counts how many push operations are waiting. inflight_push_count: AtomicU64, + /// Last known capacity check result. + cached_capacity: AtomicBool, } impl GuardedEnvelopeBuffer { @@ -69,6 +73,7 @@ impl GuardedEnvelopeBuffer { }), notify: tokio::sync::Notify::new(), inflight_push_count: AtomicU64::new(0), + cached_capacity: AtomicBool::new(true), }) } else { None @@ -138,9 +143,31 @@ impl GuardedEnvelopeBuffer { } /// Returns `true` if the buffer has capacity to accept more [`Envelope`]s. - pub async fn has_capacity(&self) -> bool { - let guard = self.inner.lock().await; - guard.backend.has_capacity() + /// + /// This method tries to acquire the lock and read the latest capacity, but doesn't + /// guarantee that the returned value will be up to date, since lock contention could lead to + /// this method never acquiring the lock, thus returning the last known capacity value. + pub fn has_capacity(&self) -> bool { + match self.inner.try_lock() { + Ok(guard) => { + relay_statsd::metric!( + counter(RelayCounters::BufferCapacityCheck) += 1, + lock_aquired = "true" + ); + + let has_capacity = guard.backend.has_capacity(); + self.cached_capacity.store(has_capacity, Ordering::Relaxed); + has_capacity + } + Err(_) => { + relay_statsd::metric!( + counter(RelayCounters::BufferCapacityCheck) += 1, + lock_aquired = "false" + ); + + self.cached_capacity.load(Ordering::Relaxed) + } + } } /// Returns the count of how many pushes are in flight and not been finished. diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index 3269f36bc4..a2d49c62d1 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -3,7 +3,6 @@ use relay_config::Config; use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; -use crate::services::buffer::envelope_store::EnvelopeStore; use crate::services::buffer::stack_provider::StackProvider; use crate::{Envelope, SqliteEnvelopeStack}; diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 26e10a2720..00b0773df7 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -603,6 +603,12 @@ pub enum RelayCounters { /// - `state_out`: The new state. `memory`, `memory_file_standby`, or `disk`. /// - `reason`: Why a transition was made (or not made). BufferStateTransition, + /// Number of times the capacity is of the buffer is checked. + /// + /// This metric is tagged with: + /// - `lock_acquired`: Whether the capacity check was done by acquiring the lock or using the + /// old value. + BufferCapacityCheck, /// /// Number of outcomes and reasons for rejected Envelopes. /// @@ -817,6 +823,7 @@ impl CounterMetric for RelayCounters { RelayCounters::BufferEnvelopesWritten => "buffer.envelopes_written", RelayCounters::BufferEnvelopesRead => "buffer.envelopes_read", RelayCounters::BufferStateTransition => "buffer.state.transition", + RelayCounters::BufferCapacityCheck => "buffer.capacity_check", RelayCounters::Outcomes => "events.outcomes", RelayCounters::ProjectStateGet => "project_state.get", RelayCounters::ProjectStateRequest => "project_state.request",