diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 969d9a3565..ebe30bc76f 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -5,7 +5,6 @@ use std::time::Instant; use relay_base_schema::project::ProjectKey; use relay_config::Config; -use stack_key::StackKey; use crate::envelope::Envelope; use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; @@ -213,10 +212,13 @@ where self.priority_queue.change_priority_by(stack_key, |stack| { let mut found = false; for (subkey, readiness) in [ - (stack_key.lesser(), &mut stack.readiness.0), - (stack_key.greater(), &mut stack.readiness.1), + (stack_key.own_key, &mut stack.readiness.own_project_ready), + ( + stack_key.sampling_key, + &mut stack.readiness.sampling_project_ready, + ), ] { - if subkey == project { + if subkey == *project { found = true; if *readiness != is_ready { changed = true; @@ -261,46 +263,38 @@ where .remove(&stack_key); } self.priority_queue.remove(&stack_key); + relay_statsd::metric!( gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 ); } } -mod stack_key { - use super::*; - /// Sorted stack key. - /// - /// Contains a pair of project keys. The lower key is always the first - /// element in the pair, such that `(k1, k2)` and `(k2, k1)` map to the same - /// stack key. - #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] - pub struct StackKey(ProjectKey, ProjectKey); - - impl StackKey { - pub fn from_envelope(envelope: &Envelope) -> Self { - let own_key = envelope.meta().public_key(); - let sampling_key = envelope.sampling_key().unwrap_or(own_key); - Self::new(own_key, sampling_key) - } - - pub fn lesser(&self) -> &ProjectKey { - &self.0 - } +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct StackKey { + own_key: ProjectKey, + sampling_key: ProjectKey, +} - pub fn greater(&self) -> &ProjectKey { - &self.1 - } +impl StackKey { + pub fn from_envelope(envelope: &Envelope) -> Self { + let own_key = envelope.meta().public_key(); + let sampling_key = envelope.sampling_key().unwrap_or(own_key); + Self::new(own_key, sampling_key) + } - pub fn iter(&self) -> impl Iterator { - std::iter::once(self.0).chain((self.0 != self.1).then_some(self.1)) - } + pub fn iter(&self) -> impl Iterator { + let Self { + own_key, + sampling_key, + } = self; + std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key)) + } - fn new(mut key1: ProjectKey, mut key2: ProjectKey) -> Self { - if key2 < key1 { - std::mem::swap(&mut key1, &mut key2); - } - Self(key1, key2) + fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self { + Self { + own_key, + sampling_key, } } } @@ -374,15 +368,21 @@ impl Ord for Priority { } #[derive(Debug)] -struct Readiness(bool, bool); +struct Readiness { + own_project_ready: bool, + sampling_project_ready: bool, +} impl Readiness { fn new() -> Self { - Self(false, false) + Self { + own_project_ready: false, + sampling_project_ready: false, + } } fn ready(&self) -> bool { - self.0 && self.1 + self.own_project_ready && self.sampling_project_ready } } @@ -594,4 +594,26 @@ mod tests { assert!(buffer.pop().await.unwrap().is_none()); } + + #[tokio::test] + async fn project_keys_distinct() { + let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); + + let stack_key1 = StackKey::new(project_key1, project_key2); + let stack_key2 = StackKey::new(project_key2, project_key1); + + assert_ne!(stack_key1, stack_key2); + + let mut buffer = EnvelopeBuffer::::new(); + buffer + .push(new_envelope(project_key1, Some(project_key2))) + .await + .unwrap(); + buffer + .push(new_envelope(project_key2, Some(project_key1))) + .await + .unwrap(); + assert_eq!(buffer.priority_queue.len(), 2); + } }