diff --git a/relay-server/src/services/buffer/envelopebuffer.rs b/relay-server/src/services/buffer/envelopebuffer.rs index 9a1a1e0547..ba62cf352a 100644 --- a/relay-server/src/services/buffer/envelopebuffer.rs +++ b/relay-server/src/services/buffer/envelopebuffer.rs @@ -1,4 +1,5 @@ use std::cmp::Ordering; +use std::collections::BTreeSet; use std::time::Instant; use relay_base_schema::project::ProjectKey; @@ -12,7 +13,7 @@ pub trait EnvelopeBuffer { fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool); } -#[derive(Hash, PartialEq, Eq)] +#[derive(Hash, PartialEq, Eq, PartialOrd, Ord)] struct StackKey { own_key: ProjectKey, sampling_key: ProjectKey, @@ -23,40 +24,68 @@ impl StackKey { } struct PriorityEnvelopeBuffer { - own_keys: hashbrown::HashMap>, - sampling_keys: hashbrown::HashMap>, - stacks: priority_queue::PriorityQueue>, + own_keys: hashbrown::HashMap>, + sampling_keys: hashbrown::HashMap>, + stacks: priority_queue::PriorityQueue, Priority>, } impl EnvelopeBuffer for PriorityEnvelopeBuffer { fn push(&mut self, envelope: Box) { + let received_at = envelope.received_at(); let stack_key = StackKey::from_envelope(&envelope); - let updated = self.stacks.change_priority_by(&stack_key, |stack| {}); - if !updated { - let old = self.stacks.push(stack_key, PrioritizedStack::new(envelope)); - debug_assert!(old.is_none()); + if let Some(qi) = self.stacks.get_mut(&stack_key) { + qi.0.value.push(envelope); + } else { + self.stacks.push( + QueueItem { + key: stack_key, + value: S::new(envelope), + }, + Priority::new(received_at), + ); + self.own_keys + .entry(stack_key.own_key) + .or_default() + .insert(stack_key); + self.sampling_keys + .entry(stack_key.sampling_key) + .or_default() + .insert(stack_key); } - self.own_keys - .entry(stack_key.own_key) - .or_default() - .push(stack_key); - self.sampling_keys - .entry(stack_key.sampling_key) - .or_default() - .push(stack_key); + self.stacks.change_priority_by(stack_key, |prio| { + prio.received_at = received_at; + }); } fn pop(&mut self) -> Option> { - let (stack_key, stack) = self.stacks.peek_mut()?; - let entry = self - .own_keys - .entry(stack_key.own_key) - .or_default() - .push(stack_key); - self.sampling_keys - .entry(stack_key.sampling_key) - .or_default() - .push(stack_key); + let ( + QueueItem { + key: stack_key, + value: stack, + }, + priority, + ) = self.stacks.peek_mut()?; + let envelope = stack.pop(); + debug_assert!(envelope.is_some()); + match stack.peek() { + None => { + self.own_keys + .get_mut(&stack_key.own_key) + .expect("own_keys") + .remove(&stack_key); + self.sampling_keys + .get_mut(&stack_key.sampling_key) + .expect("sampling_keys") + .remove(&stack_key); + self.stacks.remove(stack_key); + } + Some(next_envelope) => { + self.stacks.change_priority_by(stack_key, |prio| { + prio.received_at = next_envelope.received_at(); + }); + } + } + envelope } fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) { @@ -77,45 +106,68 @@ impl EnvelopeBuffer for PriorityEnvelopeBuffer { } } -struct PrioritizedStack { +struct QueueItem { + key: K, + value: V, +} + +impl std::borrow::Borrow for QueueItem { + fn borrow(&self) -> &K { + &self.key + } +} + +impl std::hash::Hash for QueueItem { + fn hash(&self, state: &mut H) { + self.key.hash(state); + } +} + +impl PartialEq for QueueItem { + fn eq(&self, other: &Self) -> bool { + self.key == other.key + } +} + +impl Eq for QueueItem {} + +struct Priority { own_ready: bool, sampling_ready: bool, received_at: Instant, - stack: S, } -impl PrioritizedStack { +impl Priority { fn ready(&self) -> bool { self.own_ready && self.sampling_ready } } -impl PrioritizedStack { +impl Priority { fn new(received_at: Instant) -> Self { Self { own_ready: false, sampling_ready: false, received_at, - stack: S::default(), } } } -impl PartialEq for PrioritizedStack { +impl PartialEq for Priority { fn eq(&self, other: &Self) -> bool { self.ready() == other.ready() && self.received_at == other.received_at } } -impl PartialOrd for PrioritizedStack { +impl PartialOrd for Priority { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Eq for PrioritizedStack {} +impl Eq for Priority {} -impl Ord for PrioritizedStack { +impl Ord for Priority { fn cmp(&self, other: &Self) -> Ordering { match (self.ready(), other.ready()) { (true, true) => self.received_at.cmp(&other.received_at),