diff --git a/relay-server/src/services/buffer/envelopebuffer.rs b/relay-server/src/services/buffer/envelopebuffer.rs index a28a7302b6..436723789d 100644 --- a/relay-server/src/services/buffer/envelopebuffer.rs +++ b/relay-server/src/services/buffer/envelopebuffer.rs @@ -9,6 +9,7 @@ use crate::services::buffer::envelopestack::EnvelopeStack; pub trait EnvelopeBuffer { fn push(&mut self, envelope: Box); + fn peek(&mut self) -> Option<&Envelope>; fn pop(&mut self) -> Option>; fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool); } @@ -35,6 +36,16 @@ struct PriorityEnvelopeBuffer { stacks: priority_queue::PriorityQueue, Priority>, } +impl PriorityEnvelopeBuffer { + fn new() -> Self { + Self { + own_keys: Default::default(), + sampling_keys: Default::default(), + stacks: Default::default(), + } + } +} + impl EnvelopeBuffer for PriorityEnvelopeBuffer { fn push(&mut self, envelope: Box) { let received_at = envelope.meta().start_time(); @@ -52,7 +63,7 @@ impl EnvelopeBuffer for PriorityEnvelopeBuffer { self.own_keys .entry(stack_key.own_key) .or_default() - .insert(stack_key.clone()); + .insert(stack_key); self.sampling_keys .entry(stack_key.sampling_key) .or_default() @@ -63,6 +74,17 @@ impl EnvelopeBuffer for PriorityEnvelopeBuffer { }); } + fn peek(&mut self) -> Option<&Envelope> { + let ( + QueueItem { + key: _, + value: stack, + }, + _, + ) = self.stacks.peek_mut()?; + stack.peek() + } + fn pop(&mut self) -> Option> { let (QueueItem { key, value: stack }, _) = self.stacks.peek_mut()?; let stack_key = *key; @@ -184,3 +206,95 @@ impl Ord for Priority { } } } + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use relay_common::Dsn; + + use crate::extractors::RequestMeta; + use crate::services::buffer::envelopestack::InMemoryEnvelopeStack; + + use super::*; + + fn new_envelope(project_key: ProjectKey, sampling_key: Option) -> Box { + let envelope = Envelope::from_request( + None, + RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()), + ); + // TODO: sampling key + envelope + } + + #[test] + fn insert_pop() { + let mut buffer = PriorityEnvelopeBuffer::::new(); + + let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); + + assert!(buffer.pop().is_none()); + assert!(buffer.peek().is_none()); + + buffer.push(new_envelope(project_key1, None)); + assert_eq!(buffer.peek().unwrap().meta().public_key(), project_key1); + + buffer.push(new_envelope(project_key2, None)); + // Both projects are not ready, so project 1 is on top (has the oldest envelopes): + assert_eq!(buffer.peek().unwrap().meta().public_key(), project_key1); + + buffer.push(new_envelope(project_key3, None)); + // All projects are not ready, so project 1 is on top (has the oldest envelopes): + assert_eq!(buffer.peek().unwrap().meta().public_key(), project_key1); + + // After marking a project ready, it goes to the top: + buffer.mark_ready(&project_key3, true); + assert_eq!(buffer.peek().unwrap().meta().public_key(), project_key3); + assert_eq!(buffer.pop().unwrap().meta().public_key(), project_key3); + + // After popping, project 1 is on top again: + assert_eq!(buffer.peek().unwrap().meta().public_key(), project_key1); + + // Mark project 1 as ready (still on top): + buffer.mark_ready(&project_key1, true); + assert_eq!(buffer.peek().unwrap().meta().public_key(), project_key1); + + // Mark project 2 as ready as well (now on top because most recent): + buffer.mark_ready(&project_key2, true); + assert_eq!(buffer.peek().unwrap().meta().public_key(), project_key2); + assert_eq!(buffer.pop().unwrap().meta().public_key(), project_key2); + + // Pop last element: + assert_eq!(buffer.pop().unwrap().meta().public_key(), project_key1); + assert!(buffer.pop().is_none()); + assert!(buffer.peek().is_none()); + } + + #[test] + fn project_internal_order() { + let mut buffer = PriorityEnvelopeBuffer::::new(); + + let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + + let envelope1 = new_envelope(project_key, None); + let instant1 = envelope1.meta().start_time(); + let envelope2 = new_envelope(project_key, None); + let instant2 = envelope2.meta().start_time(); + + assert!(instant2 > instant1); + + buffer.push(envelope1); + buffer.push(envelope2); + + assert_eq!(buffer.pop().unwrap().meta().start_time(), instant2); + assert_eq!(buffer.pop().unwrap().meta().start_time(), instant1); + assert!(buffer.pop().is_none()); + } + + #[test] + fn sampling_projects() { + todo!() + } +} diff --git a/relay-server/src/services/buffer/envelopestack.rs b/relay-server/src/services/buffer/envelopestack.rs index f5f58db230..417ea7b78c 100644 --- a/relay-server/src/services/buffer/envelopestack.rs +++ b/relay-server/src/services/buffer/envelopestack.rs @@ -9,3 +9,23 @@ pub trait EnvelopeStack { fn peek(&self) -> Option<&Envelope>; } + +pub struct InMemoryEnvelopeStack(Vec>); + +impl EnvelopeStack for InMemoryEnvelopeStack { + fn new(envelope: Box) -> Self { + Self(vec![envelope]) + } + + fn push(&mut self, envelope: Box) { + self.0.push(envelope) + } + + fn pop(&mut self) -> Option> { + self.0.pop() + } + + fn peek(&self) -> Option<&Envelope> { + self.0.last().map(Box::as_ref) + } +}