Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spool): Treat (key1, key2) and (key2, key1) as distinct stack keys #3881

Merged
merged 67 commits into from
Aug 13, 2024
Merged
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
35805f3
wip
jjbayer Jul 25, 2024
06ff0ac
wip: Borrow<K>
jjbayer Jul 25, 2024
cb71cce
it compiles
jjbayer Jul 25, 2024
b60cb8b
some tests
jjbayer Jul 25, 2024
ed56465
test again
jjbayer Jul 25, 2024
005c093
Push to envelope buffer
jjbayer Jul 25, 2024
9350674
async interface
jjbayer Jul 25, 2024
5942fff
Merge remote-tracking branch 'origin/master' into feat/envelope-buffer
jjbayer Jul 26, 2024
9428e5c
files
jjbayer Jul 26, 2024
26eb70b
return ready
jjbayer Jul 26, 2024
b7abdad
prefetch
jjbayer Jul 26, 2024
d1b8bea
wip project cache
jjbayer Jul 26, 2024
faf72fe
borrow checker
jjbayer Jul 26, 2024
d0dae29
v1 vs v2
jjbayer Jul 26, 2024
0fb8830
make envelope buffer optional for now
jjbayer Jul 26, 2024
a7d1cf8
fix: health check
jjbayer Jul 26, 2024
82f3a95
awaiting pop
jjbayer Jul 27, 2024
db1ec70
mark_ready on updated state
jjbayer Jul 27, 2024
8c5167e
test: peek
jjbayer Jul 28, 2024
472cecd
notify on change
jjbayer Jul 28, 2024
6abed12
Notify on ready
jjbayer Jul 28, 2024
9eb1fcc
fix: only one envelope buffer
jjbayer Jul 28, 2024
3d21861
fix: derive envelope group
jjbayer Jul 28, 2024
29fe242
Notify only if changed
jjbayer Jul 28, 2024
c686d93
ref: Eliminate duplicate stack keys
jjbayer Jul 28, 2024
d9f7895
Merge remote-tracking branch 'origin/master' into feat/envelope-buffer
jjbayer Jul 29, 2024
b50e19d
Merge
jjbayer Jul 29, 2024
054778a
wip: On-disk creation
jjbayer Jul 29, 2024
761b6c1
Improve
iambriccardo Jul 29, 2024
956b249
Improve
iambriccardo Jul 29, 2024
8864e61
Improve
iambriccardo Jul 29, 2024
990bb4e
Improve
iambriccardo Jul 29, 2024
c5cd261
Improve
iambriccardo Jul 29, 2024
8972444
Improve
iambriccardo Jul 30, 2024
a822832
Improve
iambriccardo Jul 30, 2024
8325990
Improve
iambriccardo Jul 30, 2024
8de1010
Improve
iambriccardo Jul 30, 2024
fc5a0e9
Improve
iambriccardo Jul 30, 2024
1729051
Improve
iambriccardo Jul 30, 2024
af36538
Improve
iambriccardo Jul 30, 2024
5f5228a
Improve
iambriccardo Jul 30, 2024
6c7cd9f
little fixes
jjbayer Jul 30, 2024
2ccd34d
error handling
jjbayer Jul 30, 2024
b6262f2
metrics
jjbayer Jul 30, 2024
3a12589
Improve
iambriccardo Jul 30, 2024
be9e4e5
Improve
iambriccardo Jul 30, 2024
4f9030d
doc & config
jjbayer Jul 30, 2024
0e0ab80
Improve
iambriccardo Jul 30, 2024
1d57809
Improve
iambriccardo Jul 30, 2024
07ec7e9
Improve
iambriccardo Jul 30, 2024
e3e62ec
Improve
iambriccardo Jul 30, 2024
f5c8df0
Improve
iambriccardo Jul 30, 2024
71da426
fix test_query
jjbayer Jul 30, 2024
8e57db9
changelog
jjbayer Jul 30, 2024
675b29f
count inflight
jjbayer Jul 30, 2024
107ec5b
metric for in flight pushes
jjbayer Jul 30, 2024
007bd36
bench
jjbayer Jul 30, 2024
3586303
bench: More projects
jjbayer Jul 31, 2024
ece3f32
Merge branch 'master' into feat/envelope-buffer
jjbayer Jul 31, 2024
67aea56
fix: org_id check
jjbayer Jul 31, 2024
2de15b0
ref: Variables are your friend
jjbayer Jul 31, 2024
09d0b24
ref: review comments
jjbayer Jul 31, 2024
d58c64d
fix: guard changed
jjbayer Jul 31, 2024
8ff1f43
fix(spool): Treat (key1, key2) and (key2, key1) as distinct stack keys
jjbayer Jul 31, 2024
0b6108a
Merge branch 'master' into feat/envelope-buffer-own-key
jjbayer Aug 1, 2024
4c1a7f2
format
jjbayer Aug 1, 2024
3106b79
Empty commit
iambriccardo Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 60 additions & 38 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::time::Instant;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use stack_key::StackKey;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError;
Expand Down Expand Up @@ -213,10 +212,13 @@ where
self.priority_queue.change_priority_by(stack_key, |stack| {
let mut found = false;
for (subkey, readiness) in [
(stack_key.lesser(), &mut stack.readiness.0),
(stack_key.greater(), &mut stack.readiness.1),
(stack_key.own_key, &mut stack.readiness.own_project_ready),
(
stack_key.sampling_key,
&mut stack.readiness.sampling_project_ready,
),
] {
if subkey == project {
if subkey == *project {
found = true;
if *readiness != is_ready {
changed = true;
Expand Down Expand Up @@ -261,46 +263,38 @@ where
.remove(&stack_key);
}
self.priority_queue.remove(&stack_key);

relay_statsd::metric!(
gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64
);
}
}

mod stack_key {
use super::*;
/// Sorted stack key.
///
/// Contains a pair of project keys. The lower key is always the first
/// element in the pair, such that `(k1, k2)` and `(k2, k1)` map to the same
/// stack key.
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct StackKey(ProjectKey, ProjectKey);

impl StackKey {
pub fn from_envelope(envelope: &Envelope) -> Self {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);
Self::new(own_key, sampling_key)
}

pub fn lesser(&self) -> &ProjectKey {
&self.0
}
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct StackKey {
own_key: ProjectKey,
sampling_key: ProjectKey,
}

pub fn greater(&self) -> &ProjectKey {
&self.1
}
impl StackKey {
pub fn from_envelope(envelope: &Envelope) -> Self {
let own_key = envelope.meta().public_key();
let sampling_key = envelope.sampling_key().unwrap_or(own_key);
Self::new(own_key, sampling_key)
}

pub fn iter(&self) -> impl Iterator<Item = ProjectKey> {
std::iter::once(self.0).chain((self.0 != self.1).then_some(self.1))
}
pub fn iter(&self) -> impl Iterator<Item = ProjectKey> {
let Self {
own_key,
sampling_key,
} = self;
std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key))
}

fn new(mut key1: ProjectKey, mut key2: ProjectKey) -> Self {
if key2 < key1 {
std::mem::swap(&mut key1, &mut key2);
}
Self(key1, key2)
fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self {
Self {
own_key,
sampling_key,
}
}
}
Expand Down Expand Up @@ -374,15 +368,21 @@ impl Ord for Priority {
}

#[derive(Debug)]
struct Readiness(bool, bool);
struct Readiness {
own_project_ready: bool,
sampling_project_ready: bool,
}

impl Readiness {
fn new() -> Self {
Self(false, false)
Self {
own_project_ready: false,
sampling_project_ready: false,
}
}

fn ready(&self) -> bool {
self.0 && self.1
self.own_project_ready && self.sampling_project_ready
}
}

Expand Down Expand Up @@ -594,4 +594,26 @@ mod tests {

assert!(buffer.pop().await.unwrap().is_none());
}

#[tokio::test]
async fn project_keys_distinct() {
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();

let stack_key1 = StackKey::new(project_key1, project_key2);
let stack_key2 = StackKey::new(project_key2, project_key1);

assert_ne!(stack_key1, stack_key2);

let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new();
buffer
.push(new_envelope(project_key1, Some(project_key2)))
.await
.unwrap();
buffer
.push(new_envelope(project_key2, Some(project_key1)))
.await
.unwrap();
assert_eq!(buffer.priority_queue.len(), 2);
}
}
Loading