Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(buffer): Optimistically initialize stacks as ready #4046

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 38 additions & 81 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ where
},
Priority {
readiness,
next_project_fetch: next_peek,
next_project_fetch,
..
},
)) = self.priority_queue.peek_mut()
Expand All @@ -305,7 +305,7 @@ where
Ok(match (stack.peek().await?, ready) {
(None, _) => Peek::Empty,
(Some(envelope), true) => Peek::Ready(envelope),
(Some(envelope), false) => Peek::NotReady(*stack_key, *next_peek, envelope),
(Some(envelope), false) => Peek::NotReady(*stack_key, *next_project_fetch, envelope),
})
}

Expand Down Expand Up @@ -607,9 +607,11 @@ struct Readiness {

impl Readiness {
fn new() -> Self {
// Optimistically set ready state to true.
// The large majority of stack creations are re-creations after a stack was emptied.
Self {
own_project_ready: false,
sampling_project_ready: false,
own_project_ready: true,
sampling_project_ready: true,
}
}

Expand Down Expand Up @@ -695,6 +697,17 @@ mod tests {
MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
}

async fn peek_project_key(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> ProjectKey {
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key()
}

#[tokio::test]
async fn test_insert_pop() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(mock_memory_checker());
Expand All @@ -710,110 +723,48 @@ mod tests {
.push(new_envelope(project_key1, None, None))
.await
.unwrap();
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key1
);

buffer
.push(new_envelope(project_key2, None, None))
.await
.unwrap();

// Both projects are ready, so project 2 is on top (has the newest envelopes):
assert_eq!(peek_project_key(&mut buffer).await, project_key2);

buffer.mark_ready(&project_key1, false);
buffer.mark_ready(&project_key2, false);

// Both projects are not ready, so project 1 is on top (has the oldest envelopes):
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key1
);
assert_eq!(peek_project_key(&mut buffer).await, project_key1);

buffer
.push(new_envelope(project_key3, None, None))
.await
.unwrap();
buffer.mark_ready(&project_key3, false);

// All projects are not ready, so project 1 is on top (has the oldest envelopes):
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key1
);
assert_eq!(peek_project_key(&mut buffer).await, project_key1);

// After marking a project ready, it goes to the top:
buffer.mark_ready(&project_key3, true);
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key3
);
assert_eq!(peek_project_key(&mut buffer).await, project_key3);
assert_eq!(
buffer.pop().await.unwrap().unwrap().meta().public_key(),
project_key3
);

// After popping, project 1 is on top again:
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key1
);
assert_eq!(peek_project_key(&mut buffer).await, project_key1);

// Mark project 1 as ready (still on top):
buffer.mark_ready(&project_key1, true);
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key1
);
assert_eq!(peek_project_key(&mut buffer).await, project_key1);

// Mark project 2 as ready as well (now on top because most recent):
buffer.mark_ready(&project_key2, true);
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key2
);
assert_eq!(peek_project_key(&mut buffer).await, project_key2);
assert_eq!(
buffer.pop().await.unwrap().unwrap().meta().public_key(),
project_key2
Expand Down Expand Up @@ -874,6 +825,9 @@ mod tests {
let instant3 = envelope3.meta().start_time();
buffer.push(envelope3).await.unwrap();

buffer.mark_ready(&project_key1, false);
buffer.mark_ready(&project_key2, false);

// Nothing is ready, instant1 is on top:
assert_eq!(
buffer
Expand Down Expand Up @@ -1015,6 +969,9 @@ mod tests {
buffer.push(envelope1).await.unwrap();
buffer.push(envelope2).await.unwrap();

buffer.mark_ready(&project_key_1, false);
buffer.mark_ready(&project_key_2, false);

// event_id_1 is first element:
let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl EnvelopeBufferService {
self.services.project_cache.send(DequeuedEnvelope(envelope));
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, next_peek, envelope) => {
Peek::NotReady(stack_key, next_project_fetch, envelope) => {
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
Expand All @@ -247,7 +247,7 @@ impl EnvelopeBufferService {
// We want to fetch the configs again, only if some time passed between the last
// peek of this not ready project key pair and the current peek. This is done to
// avoid flooding the project cache with `UpdateProject` messages.
if Instant::now() >= next_peek {
if Instant::now() >= next_project_fetch {
relay_log::trace!("EnvelopeBufferService: requesting project(s) update");
let project_key = envelope.meta().public_key();
self.services.project_cache.send(UpdateProject(project_key));
Expand Down
Loading