Skip to content

Commit

Permalink
fix(spool): Treat (key1, key2) and (key2, key1) as distinct stack keys (
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Aug 13, 2024
1 parent f595bee commit d5c5ad3
Showing 1 changed file with 60 additions and 38 deletions.
98 changes: 60 additions & 38 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::time::Instant;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use stack_key::StackKey;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
Expand Down Expand Up @@ -213,10 +212,13 @@ where
self.priority_queue.change_priority_by(stack_key, |stack| {
let mut found = false;
for (subkey, readiness) in [
(stack_key.lesser(), &mut stack.readiness.0),
(stack_key.greater(), &mut stack.readiness.1),
(stack_key.own_key, &mut stack.readiness.own_project_ready),
(
stack_key.sampling_key,
&mut stack.readiness.sampling_project_ready,
),
] {
if subkey == project {
if subkey == *project {
found = true;
if *readiness != is_ready {
changed = true;
Expand Down Expand Up @@ -261,46 +263,38 @@ where
.remove(&stack_key);
}
self.priority_queue.remove(&stack_key);

relay_statsd::metric!(
gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64
);
}
}

mod stack_key {
use super::*;
/// Sorted stack key.
///
/// Contains a pair of project keys. The lower key is always the first
/// element in the pair, such that `(k1, k2)` and `(k2, k1)` map to the same
/// stack key.
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct StackKey(ProjectKey, ProjectKey);

impl StackKey {
pub fn from_envelope(envelope: &Envelope) -> Self {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);
Self::new(own_key, sampling_key)
}

pub fn lesser(&self) -> &ProjectKey {
&self.0
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct StackKey {
own_key: ProjectKey,
sampling_key: ProjectKey,
}

pub fn greater(&self) -> &ProjectKey {
&self.1
}
impl StackKey {
pub fn from_envelope(envelope: &Envelope) -> Self {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);
Self::new(own_key, sampling_key)
}

pub fn iter(&self) -> impl Iterator<Item = ProjectKey> {
std::iter::once(self.0).chain((self.0 != self.1).then_some(self.1))
}
pub fn iter(&self) -> impl Iterator<Item = ProjectKey> {
let Self {
own_key,
sampling_key,
} = self;
std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key))
}

fn new(mut key1: ProjectKey, mut key2: ProjectKey) -> Self {
if key2 < key1 {
std::mem::swap(&mut key1, &mut key2);
}
Self(key1, key2)
fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self {
Self {
own_key,
sampling_key,
}
}
}
Expand Down Expand Up @@ -374,15 +368,21 @@ impl Ord for Priority {
}

#[derive(Debug)]
struct Readiness(bool, bool);
struct Readiness {
own_project_ready: bool,
sampling_project_ready: bool,
}

impl Readiness {
fn new() -> Self {
Self(false, false)
Self {
own_project_ready: false,
sampling_project_ready: false,
}
}

fn ready(&self) -> bool {
self.0 && self.1
self.own_project_ready && self.sampling_project_ready
}
}

Expand Down Expand Up @@ -594,4 +594,26 @@ mod tests {

assert!(buffer.pop().await.unwrap().is_none());
}

#[tokio::test]
async fn project_keys_distinct() {
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();

let stack_key1 = StackKey::new(project_key1, project_key2);
let stack_key2 = StackKey::new(project_key2, project_key1);

assert_ne!(stack_key1, stack_key2);

let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
buffer
.push(new_envelope(project_key1, Some(project_key2)))
.await
.unwrap();
buffer
.push(new_envelope(project_key2, Some(project_key1)))
.await
.unwrap();
assert_eq!(buffer.priority_queue.len(), 2);
}
}

0 comments on commit d5c5ad3

Please sign in to comment.