diff --git a/relay-server/src/services/buffer/envelopebuffer.rs b/relay-server/src/services/buffer/envelopebuffer.rs index ba62cf352a..a28a7302b6 100644 --- a/relay-server/src/services/buffer/envelopebuffer.rs +++ b/relay-server/src/services/buffer/envelopebuffer.rs @@ -13,14 +13,20 @@ pub trait EnvelopeBuffer { fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool); } -#[derive(Hash, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] struct StackKey { own_key: ProjectKey, sampling_key: ProjectKey, } impl StackKey { - fn from_envelope(envelope: &Envelope) -> Self {} + fn from_envelope(envelope: &Envelope) -> Self { + let own_key = envelope.meta().public_key(); + Self { + own_key, + sampling_key: envelope.sampling_key().unwrap_or(own_key), + } + } } struct PriorityEnvelopeBuffer { @@ -31,7 +37,7 @@ struct PriorityEnvelopeBuffer { impl EnvelopeBuffer for PriorityEnvelopeBuffer { fn push(&mut self, envelope: Box) { - let received_at = envelope.received_at(); + let received_at = envelope.meta().start_time(); let stack_key = StackKey::from_envelope(&envelope); if let Some(qi) = self.stacks.get_mut(&stack_key) { qi.0.value.push(envelope); @@ -46,28 +52,27 @@ impl EnvelopeBuffer for PriorityEnvelopeBuffer { self.own_keys .entry(stack_key.own_key) .or_default() - .insert(stack_key); + .insert(stack_key.clone()); self.sampling_keys .entry(stack_key.sampling_key) .or_default() .insert(stack_key); } - self.stacks.change_priority_by(stack_key, |prio| { + self.stacks.change_priority_by(&stack_key, |prio| { prio.received_at = received_at; }); } fn pop(&mut self) -> Option> { - let ( - QueueItem { - key: stack_key, - value: stack, - }, - priority, - ) = self.stacks.peek_mut()?; + let (QueueItem { key, value: stack }, _) = self.stacks.peek_mut()?; + let stack_key = *key; let envelope = stack.pop(); debug_assert!(envelope.is_some()); - match stack.peek() { + + let next_received_at = stack + .peek() + .map(|next_envelope| next_envelope.meta().start_time()); + match next_received_at { None => { self.own_keys .get_mut(&stack_key.own_key) @@ -77,11 +82,11 @@ impl EnvelopeBuffer for PriorityEnvelopeBuffer { .get_mut(&stack_key.sampling_key) .expect("sampling_keys") .remove(&stack_key); - self.stacks.remove(stack_key); + self.stacks.remove(&stack_key); } - Some(next_envelope) => { - self.stacks.change_priority_by(stack_key, |prio| { - prio.received_at = next_envelope.received_at(); + Some(next_received_at) => { + self.stacks.change_priority_by(&stack_key, |prio| { + prio.received_at = next_received_at; }); } } diff --git a/relay-server/src/services/buffer/envelopestack.rs b/relay-server/src/services/buffer/envelopestack.rs index a75c9047de..f5f58db230 100644 --- a/relay-server/src/services/buffer/envelopestack.rs +++ b/relay-server/src/services/buffer/envelopestack.rs @@ -1,6 +1,8 @@ use crate::envelope::Envelope; pub trait EnvelopeStack { + fn new(envelope: Box) -> Self; + fn push(&mut self, envelope: Box); fn pop(&mut self) -> Option>;