From 2cf2aedf4f495298283a104f4e85ba8062b0d878 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 19 Sep 2024 13:05:04 +0200 Subject: [PATCH 1/3] fix(buffer): Optimistically initialize stacks as ready --- .../src/services/buffer/envelope_buffer/mod.rs | 10 ++++++---- relay-server/src/services/buffer/mod.rs | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index aa819c11f0..9ae469f2f6 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -292,7 +292,7 @@ where }, Priority { readiness, - next_project_fetch: next_peek, + next_project_fetch, .. }, )) = self.priority_queue.peek_mut() @@ -305,7 +305,7 @@ where Ok(match (stack.peek().await?, ready) { (None, _) => Peek::Empty, (Some(envelope), true) => Peek::Ready(envelope), - (Some(envelope), false) => Peek::NotReady(*stack_key, *next_peek, envelope), + (Some(envelope), false) => Peek::NotReady(*stack_key, *next_project_fetch, envelope), }) } @@ -607,9 +607,11 @@ struct Readiness { impl Readiness { fn new() -> Self { + // Optimistically set ready state to true. + // The large majority of stack creations are re-creations after a stack was emptied. Self { - own_project_ready: false, - sampling_project_ready: false, + own_project_ready: true, + sampling_project_ready: true, } } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index d02bbf87a3..8a0719e3ce 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -237,7 +237,7 @@ impl EnvelopeBufferService { self.services.project_cache.send(DequeuedEnvelope(envelope)); self.sleep = Duration::ZERO; // try next pop immediately } - Peek::NotReady(stack_key, next_peek, envelope) => { + Peek::NotReady(stack_key, next_project_fetch, envelope) => { relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready"); relay_statsd::metric!( counter(RelayCounters::BufferTryPop) += 1, @@ -247,7 +247,7 @@ impl EnvelopeBufferService { // We want to fetch the configs again, only if some time passed between the last // peek of this not ready project key pair and the current peek. This is done to // avoid flooding the project cache with `UpdateProject` messages. - if Instant::now() >= next_peek { + if Instant::now() >= next_project_fetch { relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); let project_key = envelope.meta().public_key(); self.services.project_cache.send(UpdateProject(project_key)); From 2bd18b7fa53615f2b2d6a31f44c8978f7db3f1de Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 19 Sep 2024 13:35:04 +0200 Subject: [PATCH 2/3] tests --- .../services/buffer/envelope_buffer/mod.rs | 109 +++++------------- 1 file changed, 32 insertions(+), 77 deletions(-) diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index 9ae469f2f6..d66841ebb6 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -697,6 +697,17 @@ mod tests { MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone()) } + async fn peek_project_key(buffer: &mut EnvelopeBuffer) -> ProjectKey { + buffer + .peek() + .await + .unwrap() + .envelope() + .unwrap() + .meta() + .public_key() + } + #[tokio::test] async fn test_insert_pop() { let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); @@ -712,110 +723,48 @@ mod tests { .push(new_envelope(project_key1, None, None)) .await .unwrap(); - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .meta() - .public_key(), - project_key1 - ); buffer .push(new_envelope(project_key2, None, None)) .await .unwrap(); + + // Both projects are ready, so project 2 is on top (has the newest envelopes): + assert_eq!(peek_project_key(&mut buffer).await, project_key2); + + buffer.mark_ready(&project_key1, false); + buffer.mark_ready(&project_key2, false); + // Both projects are not ready, so project 1 is on top (has the oldest envelopes): - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .meta() - .public_key(), - project_key1 - ); + assert_eq!(peek_project_key(&mut buffer).await, project_key1); buffer .push(new_envelope(project_key3, None, None)) .await .unwrap(); + buffer.mark_ready(&project_key3, false); + // All projects are not ready, so project 1 is on top (has the oldest envelopes): - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .meta() - .public_key(), - project_key1 - ); + assert_eq!(peek_project_key(&mut buffer).await, project_key1); // After marking a project ready, it goes to the top: buffer.mark_ready(&project_key3, true); - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .meta() - .public_key(), - project_key3 - ); + assert_eq!(peek_project_key(&mut buffer).await, project_key3); assert_eq!( buffer.pop().await.unwrap().unwrap().meta().public_key(), project_key3 ); // After popping, project 1 is on top again: - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .meta() - .public_key(), - project_key1 - ); + assert_eq!(peek_project_key(&mut buffer).await, project_key1); // Mark project 1 as ready (still on top): buffer.mark_ready(&project_key1, true); - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .meta() - .public_key(), - project_key1 - ); + assert_eq!(peek_project_key(&mut buffer).await, project_key1); // Mark project 2 as ready as well (now on top because most recent): buffer.mark_ready(&project_key2, true); - assert_eq!( - buffer - .peek() - .await - .unwrap() - .envelope() - .unwrap() - .meta() - .public_key(), - project_key2 - ); + assert_eq!(peek_project_key(&mut buffer).await, project_key2); assert_eq!( buffer.pop().await.unwrap().unwrap().meta().public_key(), project_key2 @@ -876,6 +825,9 @@ mod tests { let instant3 = envelope3.meta().start_time(); buffer.push(envelope3).await.unwrap(); + buffer.mark_ready(&project_key1, false); + buffer.mark_ready(&project_key2, false); + // Nothing is ready, instant1 is on top: assert_eq!( buffer @@ -1017,6 +969,9 @@ mod tests { buffer.push(envelope1).await.unwrap(); buffer.push(envelope2).await.unwrap(); + buffer.mark_ready(&project_key_1, false); + buffer.mark_ready(&project_key_2, false); + // event_id_1 is first element: let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else { panic!(); From f5bb39be3e99dfdbfd64e4d374971bba3273a812 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Thu, 19 Sep 2024 13:53:15 +0200 Subject: [PATCH 3/3] fix --- relay-server/src/services/buffer/mod.rs | 26 ++++++++++++++++++---- relay-server/src/services/project_cache.rs | 3 +++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 8a0719e3ce..cfdccda21d 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -306,9 +306,9 @@ impl EnvelopeBufferService { "EnvelopeBufferService: received project not ready message for project key {}", &project_key ); - buffer.mark_ready(&project_key, false); relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1); self.push(buffer, envelope).await; + buffer.mark_ready(&project_key, false); } EnvelopeBuffer::Ready(project_key) => { relay_log::trace!( @@ -637,7 +637,7 @@ mod tests { #[tokio::test] async fn test_update_project() { tokio::time::pause(); - let (service, global_tx, project_cache_rx, _) = buffer_service(); + let (service, global_tx, mut project_cache_rx, _) = buffer_service(); let addr = service.start(); @@ -646,17 +646,35 @@ mod tests { ))); let envelope = new_envelope(false, "foo"); + let project_key = envelope.meta().public_key(); addr.send(EnvelopeBuffer::Push(envelope.clone())); tokio::time::sleep(Duration::from_secs(1)).await; // We expect the project update request to be sent. + let Some(ProjectCache::HandleDequeuedEnvelope(envelope, _)) = project_cache_rx.recv().await + else { + panic!(); + }; + + addr.send(EnvelopeBuffer::NotReady(project_key, envelope)); + + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(project_cache_rx.len(), 1); + let message = project_cache_rx.recv().await; + assert!(matches!( + message, + Some(ProjectCache::UpdateProject(key)) if key == project_key + )); tokio::time::sleep(Duration::from_secs(1)).await; - // We expect the project update request to be sent again because 1 second passed. - assert_eq!(project_cache_rx.len(), 2); + assert_eq!(project_cache_rx.len(), 1); + assert!(matches!( + message, + Some(ProjectCache::UpdateProject(key)) if key == project_key + )) } } diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 3c6633207e..be595f340a 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -160,6 +160,7 @@ impl ValidateEnvelope { } } +#[derive(Debug)] pub struct UpdateRateLimits { project_key: ProjectKey, rate_limits: RateLimits, @@ -233,6 +234,7 @@ pub struct AddMetricMeta { /// This message is sent from the project buffer in case of the error while fetching the data from /// the persistent buffer, ensuring that we still have the index pointing to the keys, which could be found in the /// persistent storage. +#[derive(Debug)] pub struct UpdateSpoolIndex(pub HashSet); impl UpdateSpoolIndex { @@ -276,6 +278,7 @@ pub struct UpdateProject(pub ProjectKey); /// associated with a project. /// /// See the enumerated variants for a full list of available messages for this service. +#[derive(Debug)] pub enum ProjectCache { RequestUpdate(RequestUpdate), Get(GetProjectState, ProjectSender),