Skip to content

Commit

Permalink
wip: Borrow<K>
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Jul 25, 2024
1 parent 35805f3 commit 06ff0ac
Showing 1 changed file with 87 additions and 35 deletions.
122 changes: 87 additions & 35 deletions relay-server/src/services/buffer/envelopebuffer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::time::Instant;

use relay_base_schema::project::ProjectKey;
Expand All @@ -12,7 +13,7 @@ pub trait EnvelopeBuffer {
fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool);
}

#[derive(Hash, PartialEq, Eq)]
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord)]
struct StackKey {
own_key: ProjectKey,
sampling_key: ProjectKey,
Expand All @@ -23,40 +24,68 @@ impl StackKey {
}

struct PriorityEnvelopeBuffer<S: EnvelopeStack> {
own_keys: hashbrown::HashMap<ProjectKey, Vec<StackKey>>,
sampling_keys: hashbrown::HashMap<ProjectKey, Vec<StackKey>>,
stacks: priority_queue::PriorityQueue<StackKey, PrioritizedStack<S>>,
own_keys: hashbrown::HashMap<ProjectKey, BTreeSet<StackKey>>,
sampling_keys: hashbrown::HashMap<ProjectKey, BTreeSet<StackKey>>,
stacks: priority_queue::PriorityQueue<QueueItem<StackKey, S>, Priority>,
}

impl<S: EnvelopeStack> EnvelopeBuffer for PriorityEnvelopeBuffer<S> {
fn push(&mut self, envelope: Box<Envelope>) {
let received_at = envelope.received_at();
let stack_key = StackKey::from_envelope(&envelope);
let updated = self.stacks.change_priority_by(&stack_key, |stack| {});
if !updated {
let old = self.stacks.push(stack_key, PrioritizedStack::new(envelope));
debug_assert!(old.is_none());
if let Some(qi) = self.stacks.get_mut(&stack_key) {
qi.0.value.push(envelope);
} else {
self.stacks.push(
QueueItem {
key: stack_key,
value: S::new(envelope),
},
Priority::new(received_at),
);
self.own_keys
.entry(stack_key.own_key)
.or_default()
.insert(stack_key);
self.sampling_keys
.entry(stack_key.sampling_key)
.or_default()
.insert(stack_key);
}
self.own_keys
.entry(stack_key.own_key)
.or_default()
.push(stack_key);
self.sampling_keys
.entry(stack_key.sampling_key)
.or_default()
.push(stack_key);
self.stacks.change_priority_by(stack_key, |prio| {
prio.received_at = received_at;
});
}

fn pop(&mut self) -> Option<Box<Envelope>> {
let (stack_key, stack) = self.stacks.peek_mut()?;
let entry = self
.own_keys
.entry(stack_key.own_key)
.or_default()
.push(stack_key);
self.sampling_keys
.entry(stack_key.sampling_key)
.or_default()
.push(stack_key);
let (
QueueItem {
key: stack_key,
value: stack,
},
priority,
) = self.stacks.peek_mut()?;
let envelope = stack.pop();
debug_assert!(envelope.is_some());
match stack.peek() {
None => {
self.own_keys
.get_mut(&stack_key.own_key)
.expect("own_keys")
.remove(&stack_key);
self.sampling_keys
.get_mut(&stack_key.sampling_key)
.expect("sampling_keys")
.remove(&stack_key);
self.stacks.remove(stack_key);
}
Some(next_envelope) => {
self.stacks.change_priority_by(stack_key, |prio| {
prio.received_at = next_envelope.received_at();
});
}
}
envelope
}

fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) {
Expand All @@ -77,45 +106,68 @@ impl<S: EnvelopeStack> EnvelopeBuffer for PriorityEnvelopeBuffer<S> {
}
}

struct PrioritizedStack<S> {
struct QueueItem<K, V> {
key: K,
value: V,
}

impl<K, V> std::borrow::Borrow<K> for QueueItem<K, V> {
fn borrow(&self) -> &K {
&self.key
}
}

impl<K: std::hash::Hash, V> std::hash::Hash for QueueItem<K, V> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.key.hash(state);
}
}

impl<K: PartialEq, V> PartialEq for QueueItem<K, V> {
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}

impl<K: PartialEq, V> Eq for QueueItem<K, V> {}

struct Priority {
own_ready: bool,
sampling_ready: bool,
received_at: Instant,
stack: S,
}

impl<S> PrioritizedStack<S> {
impl Priority {
fn ready(&self) -> bool {
self.own_ready && self.sampling_ready
}
}

impl<S: Default> PrioritizedStack<S> {
impl Priority {
fn new(received_at: Instant) -> Self {
Self {
own_ready: false,
sampling_ready: false,
received_at,
stack: S::default(),
}
}
}

impl<S> PartialEq for PrioritizedStack<S> {
impl PartialEq for Priority {
fn eq(&self, other: &Self) -> bool {
self.ready() == other.ready() && self.received_at == other.received_at
}
}

impl<S> PartialOrd for PrioritizedStack<S> {
impl PartialOrd for Priority {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<S> Eq for PrioritizedStack<S> {}
impl Eq for Priority {}

impl<S> Ord for PrioritizedStack<S> {
impl Ord for Priority {
fn cmp(&self, other: &Self) -> Ordering {
match (self.ready(), other.ready()) {
(true, true) => self.received_at.cmp(&other.received_at),
Expand Down

0 comments on commit 06ff0ac

Please sign in to comment.