diff --git a/relay_filter/fn.should_filter.html b/relay_filter/fn.should_filter.html index 7c13760c7d..416cdd25b8 100644 --- a/relay_filter/fn.should_filter.html +++ b/relay_filter/fn.should_filter.html @@ -1,4 +1,4 @@ -should_filter in relay_filter - Rust

Function relay_filter::should_filter

source ·
pub fn should_filter<F: Filterable + Getter>(
+should_filter in relay_filter - Rust

Function relay_filter::should_filter

source ·
pub fn should_filter<F: Filterable + Getter>(
     item: &F,
     client_ip: Option<IpAddr>,
     config: &ProjectFiltersConfig,
diff --git a/relay_filter/generic/index.html b/relay_filter/generic/index.html
index 79512f1e7a..02c733a619 100644
--- a/relay_filter/generic/index.html
+++ b/relay_filter/generic/index.html
@@ -1,4 +1,4 @@
-relay_filter::generic - Rust

Module relay_filter::generic

source ·
Expand description

Implements generic filtering based on the RuleCondition DSL.

+relay_filter::generic - Rust

Module relay_filter::generic

source ·
Expand description

Implements generic filtering based on the [RuleCondition] DSL.

Multiple generic filters can be defined and they are going to be checked in FIFO order. The first one that matches, will result in the event being discarded with a FilterStatKey identifying the matching filter.

diff --git a/relay_filter/index.html b/relay_filter/index.html index 7cc3754254..acda4774e9 100644 --- a/relay_filter/index.html +++ b/relay_filter/index.html @@ -6,4 +6,4 @@
  • web crawlers (filter events sent by user agents known to be web crawlers)
  • legacy browsers (filter events originating from legacy browsers, can be configured)
  • -

    Re-exports§

    Modules§

    • Implements filtering for events caused by problematic browsers extensions.
    • Implements event filtering based on the client ip address.
    • Implements event filtering for events originating from CSP endpoints
    • Implements event filtering based on the error message
    • Implements generic filtering based on the RuleCondition DSL.
    • Implements filtering for events originating from legacy browsers.
    • Implements filtering for events originating from the localhost
    • Implements event filtering based on whether the endpoint called is a healthcheck endpoint.
    • Filters events coming from user agents known to be web crawlers.

    Structs§

    Enums§

    • Identifies which filter dropped an event for which reason.
    • A browser class to be filtered by the legacy browser filter.

    Traits§

    • A data item to which filters can be applied.

    Functions§

    • Checks whether an event should be filtered for a particular configuration.
    \ No newline at end of file +

    Re-exports§

    Modules§

    • Implements filtering for events caused by problematic browsers extensions.
    • Implements event filtering based on the client ip address.
    • Implements event filtering for events originating from CSP endpoints
    • Implements event filtering based on the error message
    • Implements generic filtering based on the [RuleCondition] DSL.
    • Implements filtering for events originating from legacy browsers.
    • Implements filtering for events originating from the localhost
    • Implements event filtering based on whether the endpoint called is a healthcheck endpoint.
    • Filters events coming from user agents known to be web crawlers.

    Structs§

    Enums§

    • Identifies which filter dropped an event for which reason.
    • A browser class to be filtered by the legacy browser filter.

    Traits§

    • A data item to which filters can be applied.

    Functions§

    • Checks whether an event should be filtered for a particular configuration.
    \ No newline at end of file diff --git a/relay_filter/struct.GenericFilterConfig.html b/relay_filter/struct.GenericFilterConfig.html index a7afdff64b..4b3265779c 100644 --- a/relay_filter/struct.GenericFilterConfig.html +++ b/relay_filter/struct.GenericFilterConfig.html @@ -1,11 +1,11 @@ GenericFilterConfig in relay_filter - Rust

    Struct relay_filter::GenericFilterConfig

    source ·
    pub struct GenericFilterConfig {
         pub id: String,
         pub is_enabled: bool,
    -    pub condition: Option<RuleCondition>,
    +    pub condition: Option<RuleCondition>,
     }
    Expand description

    Configuration for a generic filter.

    Fields§

    §id: String

    Unique identifier of the generic filter.

    §is_enabled: bool

    Specifies whether this filter is enabled.

    -
    §condition: Option<RuleCondition>

    The condition for the filter.

    +
    §condition: Option<RuleCondition>

    The condition for the filter.

    Implementations§

    source§

    impl GenericFilterConfig

    source

    pub fn is_empty(&self) -> bool

    Returns true if the filter is not enabled or no condition was supplied.

    Trait Implementations§

    source§

    impl Clone for GenericFilterConfig

    source§

    fn clone(&self) -> GenericFilterConfig

    Returns a copy of the value. Read more
    1.0.0 · source§

    fn clone_from(&mut self, source: &Self)

    Performs copy-assignment from source. Read more
    source§

    impl Debug for GenericFilterConfig

    source§

    fn fmt(&self, f: &mut Formatter<'_>) -> Result

    Formats the value using the given formatter. Read more
    source§

    impl Default for GenericFilterConfig

    source§

    fn default() -> GenericFilterConfig

    Returns the “default value” for a type. Read more
    source§

    impl<'de> Deserialize<'de> for GenericFilterConfig

    source§

    fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>
    where __D: Deserializer<'de>,

    Deserialize this value from the given Serde deserializer. Read more
    source§

    impl PartialEq for GenericFilterConfig

    source§

    fn eq(&self, other: &GenericFilterConfig) -> bool

    This method tests for self and other values to be equal, and is used diff --git a/src/relay_server/services/buffer/mod.rs.html b/src/relay_server/services/buffer/mod.rs.html index a0c2b98934..2b1ec7ae60 100644 --- a/src/relay_server/services/buffer/mod.rs.html +++ b/src/relay_server/services/buffer/mod.rs.html @@ -308,6 +308,14 @@ 308 309 310 +311 +312 +313 +314 +315 +316 +317 +318
    //! Types for buffering envelopes.
     
     use std::sync::atomic::{AtomicU64, Ordering};
    @@ -332,6 +340,16 @@
     mod stack_provider;
     mod testutils;
     
    +/// Struct that wraps the envelope buffer backend with a boolean flag to signal whether
    +/// the contents of the buffer have changed.
    +#[derive(Debug)]
    +struct Inner {
    +    /// The buffer that we are writing to and reading from.
    +    backend: PolymorphicEnvelopeBuffer,
    +    /// Used to notify callers of `peek()` of any changes in the buffer.
    +    should_peek: bool,
    +}
    +
     /// Async envelope buffering interface.
     ///
     /// Access to the buffer is synchronized by a tokio lock.
    @@ -351,7 +369,6 @@
         inner: tokio::sync::Mutex<Inner>,
         /// Used to notify callers of `peek()` of any changes in the buffer.
         notify: tokio::sync::Notify,
    -
         /// Metric that counts how many push operations are waiting.
         inflight_push_count: AtomicU64,
     }
    @@ -366,7 +383,7 @@
                 Some(Self {
                     inner: tokio::sync::Mutex::new(Inner {
                         backend: PolymorphicEnvelopeBuffer::from_config(config),
    -                    changed: true,
    +                    should_peek: true,
                     }),
                     notify: tokio::sync::Notify::new(),
                     inflight_push_count: AtomicU64::new(0),
    @@ -393,23 +410,19 @@
             });
         }
     
    -    pub fn inflight_push_count(&self) -> u64 {
    -        self.inflight_push_count.load(Ordering::Relaxed)
    -    }
    -
         /// Returns a reference to the next-in-line envelope.
         ///
         /// If the buffer is empty or has not changed since the last peek, this function will sleep
         /// until something changes in the buffer.
    -    pub async fn peek(&self) -> Peek {
    +    pub async fn peek(&self) -> EnvelopeBufferGuard {
             loop {
                 let mut guard = self.inner.lock().await;
    -            if guard.changed {
    +            if guard.should_peek {
                     match guard.backend.peek().await {
                         Ok(envelope) => {
                             if envelope.is_some() {
    -                            guard.changed = false;
    -                            return Peek {
    +                            guard.should_peek = false;
    +                            return EnvelopeBufferGuard {
                                     guard,
                                     notify: &self.notify,
                                 };
    @@ -423,14 +436,17 @@
                         }
                     };
                 }
    -            drop(guard); // release the lock
    +            // Release the lock before waiting for new notifications, otherwise we will indefinitely
    +            // block.
    +            drop(guard);
    +            // We wait to get notified for any changes in the buffer.
                 self.notify.notified().await;
             }
         }
     
         /// Marks a project as ready or not ready.
         ///
    -    /// The buffer reprioritizes its envelopes based on this information.
    +    /// The buffer re-prioritizes its envelopes based on this information.
         pub async fn mark_ready(&self, project_key: &ProjectKey, ready: bool) {
             let mut guard = self.inner.lock().await;
             let changed = guard.backend.mark_ready(project_key, ready);
    @@ -439,6 +455,11 @@
             }
         }
     
    +    /// Returns the count of how many pushes are in flight and not been finished.
    +    pub fn inflight_push_count(&self) -> u64 {
    +        self.inflight_push_count.load(Ordering::Relaxed)
    +    }
    +
         /// Adds an envelope to the buffer and wakes any waiting consumers.
         async fn push(&self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
             let mut guard = self.inner.lock().await;
    @@ -447,8 +468,9 @@
             Ok(())
         }
     
    -    fn notify(&self, guard: &mut MutexGuard<Inner>) {
    -        guard.changed = true;
    +    /// Notifies the waiting tasks that a change has happened in the buffer.
    +    fn notify(&self, guard: &mut MutexGuard<Inner>) {
    +        guard.should_peek = true;
             self.notify.notify_waiters();
         }
     }
    @@ -456,12 +478,12 @@
     /// A view onto the next envelope in the buffer.
     ///
     /// Objects of this type can only exist if the buffer is not empty.
    -pub struct Peek<'a> {
    +pub struct EnvelopeBufferGuard<'a> {
         guard: MutexGuard<'a, Inner>,
         notify: &'a tokio::sync::Notify,
     }
     
    -impl Peek<'_> {
    +impl EnvelopeBufferGuard<'_> {
         /// Returns a reference to the next envelope.
         pub async fn get(&mut self) -> Result<&Envelope, EnvelopeBufferError> {
             Ok(self
    @@ -474,7 +496,7 @@
     
         /// Pops the next envelope from the buffer.
         ///
    -    /// This functions consumes the [`Peek`].
    +    /// This functions consumes the [`EnvelopeBufferGuard`].
         pub async fn remove(mut self) -> Result<Box<Envelope>, EnvelopeBufferError> {
             self.notify();
             Ok(self
    @@ -487,7 +509,7 @@
     
         /// Sync version of [`GuardedEnvelopeBuffer::mark_ready`].
         ///
    -    /// Since [`Peek`] already has exclusive access to the buffer, it can mark projects as ready
    +    /// Since [`EnvelopeBufferGuard`] already has exclusive access to the buffer, it can mark projects as ready
         /// without awaiting the lock.
         pub fn mark_ready(&mut self, project_key: &ProjectKey, ready: bool) {
             let changed = self.guard.backend.mark_ready(project_key, ready);
    @@ -496,19 +518,13 @@
             }
         }
     
    -    fn notify(&mut self) {
    -        self.guard.changed = true;
    +    /// Notifies the waiting tasks that a change has happened in the buffer.
    +    fn notify(&mut self) {
    +        self.guard.should_peek = true;
             self.notify.notify_waiters();
         }
     }
     
    -#[derive(Debug)]
    -struct Inner {
    -    backend: PolymorphicEnvelopeBuffer,
    -    /// Used to notify callers of `peek()` of any changes in the buffer.
    -    changed: bool,
    -}
    -
     #[cfg(test)]
     mod tests {
         use std::str::FromStr;
    diff --git a/src/relay_server/services/project_cache.rs.html b/src/relay_server/services/project_cache.rs.html
    index 8aab84d0b6..50f32e5c87 100644
    --- a/src/relay_server/services/project_cache.rs.html
    +++ b/src/relay_server/services/project_cache.rs.html
    @@ -1755,13 +1755,16 @@
     1755
     1756
     1757
    +1758
    +1759
    +1760
     
    use std::collections::{BTreeMap, BTreeSet};
     use std::error::Error;
     use std::sync::Arc;
     use std::time::Duration;
     
     use crate::extractors::RequestMeta;
    -use crate::services::buffer::{EnvelopeBufferError, GuardedEnvelopeBuffer, Peek};
    +use crate::services::buffer::{EnvelopeBufferError, EnvelopeBufferGuard, GuardedEnvelopeBuffer};
     use crate::services::processor::{
         EncodeMetrics, EnvelopeProcessor, MetricData, ProcessEnvelope, ProcessingGroup, ProjectMetrics,
     };
    @@ -2826,7 +2829,10 @@
             }
         }
     
    -    async fn peek_at_envelope(&mut self, mut peek: Peek<'_>) -> Result<(), EnvelopeBufferError> {
    +    async fn peek_at_envelope(
    +        &mut self,
    +        mut peek: EnvelopeBufferGuard<'_>,
    +    ) -> Result<(), EnvelopeBufferError> {
             let envelope = peek.get().await?;
             if envelope.meta().start_time().elapsed() > self.config.spool_envelopes_max_age() {
                 let popped_envelope = peek.remove().await?;
    @@ -3231,7 +3237,7 @@
                         }
                         _ = report_ticker.tick() => {
                             if let Some(envelope_buffer) = &envelope_buffer {
    -                            relay_statsd::metric!(gauge(RelayGauges::BufferPushInFlight) = envelope_buffer.inflight_push_count());
    +                            metric!(gauge(RelayGauges::BufferPushInFlight) = envelope_buffer.inflight_push_count());
                             }
                         }
                         else => break,
    @@ -3243,8 +3249,8 @@
         }
     }
     
    -/// Temporary helper function while V1 spool eixsts.
    -async fn peek_buffer(buffer: &Option<Arc<GuardedEnvelopeBuffer>>) -> Peek {
    +/// Temporary helper function while V1 spool exists.
    +async fn peek_buffer(buffer: &Option<Arc<GuardedEnvelopeBuffer>>) -> EnvelopeBufferGuard {
         match buffer {
             Some(buffer) => buffer.peek().await,
             None => std::future::pending().await,