Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(buffer): Optimistically initialize stacks as ready #4046

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 38 additions & 81 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ where
},
Priority {
readiness,
next_project_fetch: next_peek,
next_project_fetch,
..
},
)) = self.priority_queue.peek_mut()
Expand All @@ -305,7 +305,7 @@ where
Ok(match (stack.peek().await?, ready) {
(None, _) => Peek::Empty,
(Some(envelope), true) => Peek::Ready(envelope),
(Some(envelope), false) => Peek::NotReady(*stack_key, *next_peek, envelope),
(Some(envelope), false) => Peek::NotReady(*stack_key, *next_project_fetch, envelope),
})
}

Expand Down Expand Up @@ -607,9 +607,11 @@ struct Readiness {

impl Readiness {
fn new() -> Self {
// Optimistically set ready state to true.
// The large majority of stack creations are re-creations after a stack was emptied.
Self {
own_project_ready: false,
sampling_project_ready: false,
own_project_ready: true,
sampling_project_ready: true,
}
}

Expand Down Expand Up @@ -695,6 +697,17 @@ mod tests {
MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone())
}

async fn peek_project_key(buffer: &mut EnvelopeBuffer<MemoryStackProvider>) -> ProjectKey {
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key()
}

#[tokio::test]
async fn test_insert_pop() {
let mut buffer = EnvelopeBuffer::<MemoryStackProvider>::new(mock_memory_checker());
Expand All @@ -710,110 +723,48 @@ mod tests {
.push(new_envelope(project_key1, None, None))
.await
.unwrap();
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key1
);

buffer
.push(new_envelope(project_key2, None, None))
.await
.unwrap();

// Both projects are ready, so project 2 is on top (has the newest envelopes):
assert_eq!(peek_project_key(&mut buffer).await, project_key2);

buffer.mark_ready(&project_key1, false);
buffer.mark_ready(&project_key2, false);

// Both projects are not ready, so project 1 is on top (has the oldest envelopes):
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key1
);
assert_eq!(peek_project_key(&mut buffer).await, project_key1);

buffer
.push(new_envelope(project_key3, None, None))
.await
.unwrap();
buffer.mark_ready(&project_key3, false);

// All projects are not ready, so project 1 is on top (has the oldest envelopes):
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key1
);
assert_eq!(peek_project_key(&mut buffer).await, project_key1);

// After marking a project ready, it goes to the top:
buffer.mark_ready(&project_key3, true);
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key3
);
assert_eq!(peek_project_key(&mut buffer).await, project_key3);
assert_eq!(
buffer.pop().await.unwrap().unwrap().meta().public_key(),
project_key3
);

// After popping, project 1 is on top again:
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key1
);
assert_eq!(peek_project_key(&mut buffer).await, project_key1);

// Mark project 1 as ready (still on top):
buffer.mark_ready(&project_key1, true);
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key1
);
assert_eq!(peek_project_key(&mut buffer).await, project_key1);

// Mark project 2 as ready as well (now on top because most recent):
buffer.mark_ready(&project_key2, true);
assert_eq!(
buffer
.peek()
.await
.unwrap()
.envelope()
.unwrap()
.meta()
.public_key(),
project_key2
);
assert_eq!(peek_project_key(&mut buffer).await, project_key2);
assert_eq!(
buffer.pop().await.unwrap().unwrap().meta().public_key(),
project_key2
Expand Down Expand Up @@ -874,6 +825,9 @@ mod tests {
let instant3 = envelope3.meta().start_time();
buffer.push(envelope3).await.unwrap();

buffer.mark_ready(&project_key1, false);
buffer.mark_ready(&project_key2, false);

// Nothing is ready, instant1 is on top:
assert_eq!(
buffer
Expand Down Expand Up @@ -1015,6 +969,9 @@ mod tests {
buffer.push(envelope1).await.unwrap();
buffer.push(envelope2).await.unwrap();

buffer.mark_ready(&project_key_1, false);
buffer.mark_ready(&project_key_2, false);

// event_id_1 is first element:
let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
Expand Down
30 changes: 24 additions & 6 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl EnvelopeBufferService {
self.services.project_cache.send(DequeuedEnvelope(envelope));
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, next_peek, envelope) => {
Peek::NotReady(stack_key, next_project_fetch, envelope) => {
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
Expand All @@ -247,7 +247,7 @@ impl EnvelopeBufferService {
// We want to fetch the configs again, only if some time passed between the last
// peek of this not ready project key pair and the current peek. This is done to
// avoid flooding the project cache with `UpdateProject` messages.
if Instant::now() >= next_peek {
if Instant::now() >= next_project_fetch {
relay_log::trace!("EnvelopeBufferService: requesting project(s) update");
let project_key = envelope.meta().public_key();
self.services.project_cache.send(UpdateProject(project_key));
Expand Down Expand Up @@ -306,9 +306,9 @@ impl EnvelopeBufferService {
"EnvelopeBufferService: received project not ready message for project key {}",
&project_key
);
buffer.mark_ready(&project_key, false);
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1);
self.push(buffer, envelope).await;
buffer.mark_ready(&project_key, false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a pre-existing bug: Calling mark_ready on an empty stack has no effect, so we should call it after putting back the envelope.

}
EnvelopeBuffer::Ready(project_key) => {
relay_log::trace!(
Expand Down Expand Up @@ -637,7 +637,7 @@ mod tests {
#[tokio::test]
async fn test_update_project() {
tokio::time::pause();
let (service, global_tx, project_cache_rx, _) = buffer_service();
let (service, global_tx, mut project_cache_rx, _) = buffer_service();

let addr = service.start();

Expand All @@ -646,17 +646,35 @@ mod tests {
)));

let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();

addr.send(EnvelopeBuffer::Push(envelope.clone()));

tokio::time::sleep(Duration::from_secs(1)).await;

// We expect the project update request to be sent.
let Some(ProjectCache::HandleDequeuedEnvelope(envelope, _)) = project_cache_rx.recv().await
else {
panic!();
};

addr.send(EnvelopeBuffer::NotReady(project_key, envelope));

tokio::time::sleep(Duration::from_millis(100)).await;

assert_eq!(project_cache_rx.len(), 1);
let message = project_cache_rx.recv().await;
assert!(matches!(
message,
Some(ProjectCache::UpdateProject(key)) if key == project_key
));

tokio::time::sleep(Duration::from_secs(1)).await;

// We expect the project update request to be sent again because 1 second passed.
assert_eq!(project_cache_rx.len(), 2);
assert_eq!(project_cache_rx.len(), 1);
assert!(matches!(
message,
Some(ProjectCache::UpdateProject(key)) if key == project_key
))
}
}
3 changes: 3 additions & 0 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl ValidateEnvelope {
}
}

#[derive(Debug)]
pub struct UpdateRateLimits {
project_key: ProjectKey,
rate_limits: RateLimits,
Expand Down Expand Up @@ -233,6 +234,7 @@ pub struct AddMetricMeta {
/// This message is sent from the project buffer in case of the error while fetching the data from
/// the persistent buffer, ensuring that we still have the index pointing to the keys, which could be found in the
/// persistent storage.
#[derive(Debug)]
pub struct UpdateSpoolIndex(pub HashSet<QueueKey>);

impl UpdateSpoolIndex {
Expand Down Expand Up @@ -276,6 +278,7 @@ pub struct UpdateProject(pub ProjectKey);
/// associated with a project.
///
/// See the enumerated variants for a full list of available messages for this service.
#[derive(Debug)]
pub enum ProjectCache {
RequestUpdate(RequestUpdate),
Get(GetProjectState, ProjectSender),
Expand Down
Loading