From 71df0c2ddd86f08e61d986c3bb3cf20ed552fac9 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Wed, 18 Sep 2024 15:49:51 +0200 Subject: [PATCH 1/3] feat(spooler): Implement next fetch behavior in the spooler v2 --- relay-server/src/services/buffer/common.rs | 28 +++++ .../services/buffer/envelope_buffer/mod.rs | 117 +++++++++++++++--- relay-server/src/services/buffer/mod.rs | 18 ++- 3 files changed, 143 insertions(+), 20 deletions(-) diff --git a/relay-server/src/services/buffer/common.rs b/relay-server/src/services/buffer/common.rs index 924f8aa1c8..974df541cf 100644 --- a/relay-server/src/services/buffer/common.rs +++ b/relay-server/src/services/buffer/common.rs @@ -1,4 +1,5 @@ use relay_base_schema::project::ProjectKey; +use std::time::Instant; use crate::Envelope; @@ -31,3 +32,30 @@ impl ProjectKeyPair { std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key)) } } + +pub struct ProjectKeyPairFetch { + own_project_next_fetch: Option, + sampling_project_next_fetch: Option, +} + +impl ProjectKeyPairFetch { + pub fn new( + own_project_next_fetch: Option, + sampling_project_next_fetch: Option, + ) -> Self { + Self { + own_project_next_fetch, + sampling_project_next_fetch, + } + } + + pub fn fetch_own_project_key(&self) -> bool { + self.own_project_next_fetch + .map_or(false, |n| Instant::now() >= n) + } + + pub fn fetch_sampling_project_key(&self) -> bool { + self.sampling_project_next_fetch + .map_or(false, |n| Instant::now() >= n) + } +} diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index d77efb72df..ac6f5a99b1 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -14,7 +14,7 @@ use relay_config::Config; use tokio::time::timeout; use crate::envelope::Envelope; -use crate::services::buffer::common::ProjectKeyPair; +use crate::services::buffer::common::{ProjectKeyPair, ProjectKeyPairFetch}; use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; @@ -301,7 +301,9 @@ where Ok(match (stack.peek().await?, ready) { (None, _) => Peek::Empty, (Some(envelope), true) => Peek::Ready(envelope), - (Some(envelope), false) => Peek::NotReady(*stack_key, envelope), + (Some(envelope), false) => { + Peek::NotReady(*stack_key, readiness.should_fetch(), envelope) + } }) } @@ -348,19 +350,25 @@ where /// Returns `true` if at least one priority was changed. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; - if let Some(project_key_pair) = self.stacks_by_project.get(project) { - for project_key_pair in project_key_pair { + if let Some(project_key_pairs) = self.stacks_by_project.get(project) { + // If we are marking a project as ready, we don't want to have a next fetch time but + // if we mark it as not ready, we want to fetch it as soon as possible. + let next_fetch_time = if is_ready { None } else { Some(Instant::now()) }; + + for project_key_pair in project_key_pairs { self.priority_queue .change_priority_by(project_key_pair, |stack| { let mut found = false; - for (subkey, readiness) in [ + for (subkey, readiness, next_fetch) in [ ( project_key_pair.own_key, &mut stack.readiness.own_project_ready, + &mut stack.readiness.own_project_next_fetch, ), ( project_key_pair.sampling_key, &mut stack.readiness.sampling_project_ready, + &mut stack.readiness.sampling_project_next_fetch, ), ] { if subkey == *project { @@ -369,6 +377,7 @@ where changed = true; *readiness = is_ready; } + *next_fetch = next_fetch_time; } } debug_assert!(found); @@ -387,7 +396,15 @@ where pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) { self.priority_queue .change_priority_by(project_key_pair, |stack| { + // We update the last peek to prevent ahead of line blocking by prioritizing stacks + // with an older timestamp. stack.last_peek = Instant::now(); + // We schedule the next fetch for the individual project configurations to avoid + // overloading the project cache with constant requests. {} + // TODO: use a configuration parameter for the delay. + let next_fetch = Instant::now() + Duration::from_secs(30); + stack.readiness.own_project_next_fetch = Some(next_fetch); + stack.readiness.sampling_project_next_fetch = Some(next_fetch); }); } @@ -513,7 +530,7 @@ where pub enum Peek<'a> { Empty, Ready(&'a Envelope), - NotReady(ProjectKeyPair, &'a Envelope), + NotReady(ProjectKeyPair, ProjectKeyPairFetch, &'a Envelope), } #[derive(Debug)] @@ -596,17 +613,35 @@ impl Eq for Priority {} #[derive(Debug, Clone, Copy)] struct Readiness { own_project_ready: bool, + own_project_next_fetch: Option, sampling_project_ready: bool, + sampling_project_next_fetch: Option, } impl Readiness { fn new() -> Self { + // The initial fetch is set to now, since we want to fetch the configurations immediately. + let now = Instant::now(); Self { own_project_ready: false, + own_project_next_fetch: Some(now), sampling_project_ready: false, + sampling_project_next_fetch: Some(now), } } + fn should_fetch(&self) -> ProjectKeyPairFetch { + // If both projects are ready, we return a fetch state which will never trigger fetching. + if self.ready() { + return ProjectKeyPairFetch::new(None, None); + } + + ProjectKeyPairFetch::new( + self.own_project_next_fetch, + self.sampling_project_next_fetch, + ) + } + fn ready(&self) -> bool { self.own_project_ready && self.sampling_project_ready } @@ -638,19 +673,19 @@ mod tests { fn envelope(&self) -> Option<&Envelope> { match self { Peek::Empty => None, - Peek::Ready(envelope) | Peek::NotReady(_, envelope) => Some(envelope), + Peek::Ready(envelope) | Peek::NotReady(_, _, envelope) => Some(envelope), } } } fn new_envelope( - project_key: ProjectKey, + own_key: ProjectKey, sampling_key: Option, event_id: Option, ) -> Box { let mut envelope = Envelope::from_request( None, - RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()), + RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()), ); if let Some(sampling_key) = sampling_key { envelope.set_dsc(DynamicSamplingContext { @@ -981,7 +1016,9 @@ mod tests { let p1 = Priority { readiness: Readiness { own_project_ready: true, + own_project_next_fetch: None, sampling_project_ready: true, + sampling_project_next_fetch: None, }, received_at: Instant::now(), last_peek: Instant::now(), @@ -1010,13 +1047,13 @@ mod tests { buffer.push(envelope2).await.unwrap(); // event_id_1 is first element: - let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else { panic!(); }; assert_eq!(envelope.event_id(), Some(event_id_1)); // Second peek returns same element: - let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else { + let Peek::NotReady(stack_key, _, envelope) = buffer.peek().await.unwrap() else { panic!(); }; assert_eq!(envelope.event_id(), Some(event_id_1)); @@ -1024,12 +1061,12 @@ mod tests { buffer.mark_seen(&stack_key); // After mark_seen, event 2 is on top: - let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else { panic!(); }; assert_eq!(envelope.event_id(), Some(event_id_2)); - let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else { + let Peek::NotReady(stack_key, _, envelope) = buffer.peek().await.unwrap() else { panic!(); }; assert_eq!(envelope.event_id(), Some(event_id_2)); @@ -1037,7 +1074,7 @@ mod tests { buffer.mark_seen(&stack_key); // After another mark_seen, cycle back to event 1: - let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else { panic!(); }; assert_eq!(envelope.event_id(), Some(event_id_1)); @@ -1077,4 +1114,56 @@ mod tests { // should be 2. assert_eq!(buffer.stacks_by_project.len(), 2); } + + #[tokio::test] + async fn test_next_fetch() { + let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); + + let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); + let event_id_1 = EventId::new(); + let envelope1 = new_envelope(project_key_1, None, Some(event_id_1)); + + buffer.push(envelope1).await.unwrap(); + + // We expect the top envelope to have the next fetch be immediately available since it just + // started. + let peek = buffer.peek().await.unwrap(); + let Peek::NotReady(stack_key, project_key_pair_fetch, envelope) = peek else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_1)); + assert!(project_key_pair_fetch.fetch_own_project_key()); + assert!(project_key_pair_fetch.fetch_sampling_project_key()); + + buffer.mark_seen(&stack_key); + + // After mark seen, we expect the next fetch time to be in the future. + let peek = buffer.peek().await.unwrap(); + let Peek::NotReady(stack_key, project_key_pair_fetch, envelope) = peek else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_1)); + assert!(!project_key_pair_fetch.fetch_own_project_key()); + assert!(!project_key_pair_fetch.fetch_sampling_project_key()); + + buffer.mark_ready(&stack_key.own_key, true); + + // After marking the project as ready, it means we fetched it, so we don't need to fetch it. + let peek = buffer.peek().await.unwrap(); + let Peek::Ready(envelope) = peek else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_1)); + + buffer.mark_ready(&stack_key.own_key, false); + + // After marking the project as not ready, we want to immediately schedule a next fetch. + let peek = buffer.peek().await.unwrap(); + let Peek::NotReady(_, project_key_pair_fetch, envelope) = peek else { + panic!(); + }; + assert_eq!(envelope.event_id(), Some(event_id_1)); + assert!(project_key_pair_fetch.fetch_own_project_key()); + assert!(project_key_pair_fetch.fetch_sampling_project_key()); + } } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index a21d7bd8c7..90f8d8ba07 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -237,24 +237,30 @@ impl EnvelopeBufferService { self.services.project_cache.send(DequeuedEnvelope(envelope)); self.sleep = Duration::ZERO; // try next pop immediately } - Peek::NotReady(stack_key, envelope) => { + Peek::NotReady(stack_key, project_key_pair_fetch, envelope) => { relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready, requesting project update"); relay_statsd::metric!( counter(RelayCounters::BufferTryPop) += 1, peek_result = "not_ready" ); let project_key = envelope.meta().public_key(); - self.services.project_cache.send(UpdateProject(project_key)); + if project_key_pair_fetch.fetch_own_project_key() { + self.services.project_cache.send(UpdateProject(project_key)); + } match envelope.sampling_key() { None => {} Some(sampling_key) if sampling_key == project_key => {} // already sent. Some(sampling_key) => { - self.services - .project_cache - .send(UpdateProject(sampling_key)); + if project_key_pair_fetch.fetch_sampling_project_key() { + self.services + .project_cache + .send(UpdateProject(sampling_key)); + } } } - // deprioritize the stack to prevent head-of-line blocking + + // Deprioritize the stack to prevent head-of-line blocking and update the next fetch + // time. buffer.mark_seen(&stack_key); self.sleep = DEFAULT_SLEEP; } From 7e5bdec37ad07dfdee10b2820a0f8aabe0a68047 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 19 Sep 2024 09:29:15 +0200 Subject: [PATCH 2/3] FIx --- relay-server/src/services/buffer/common.rs | 28 ---- .../services/buffer/envelope_buffer/mod.rs | 141 ++++-------------- relay-server/src/services/buffer/mod.rs | 53 +++++-- 3 files changed, 70 insertions(+), 152 deletions(-) diff --git a/relay-server/src/services/buffer/common.rs b/relay-server/src/services/buffer/common.rs index 974df541cf..924f8aa1c8 100644 --- a/relay-server/src/services/buffer/common.rs +++ b/relay-server/src/services/buffer/common.rs @@ -1,5 +1,4 @@ use relay_base_schema::project::ProjectKey; -use std::time::Instant; use crate::Envelope; @@ -32,30 +31,3 @@ impl ProjectKeyPair { std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key)) } } - -pub struct ProjectKeyPairFetch { - own_project_next_fetch: Option, - sampling_project_next_fetch: Option, -} - -impl ProjectKeyPairFetch { - pub fn new( - own_project_next_fetch: Option, - sampling_project_next_fetch: Option, - ) -> Self { - Self { - own_project_next_fetch, - sampling_project_next_fetch, - } - } - - pub fn fetch_own_project_key(&self) -> bool { - self.own_project_next_fetch - .map_or(false, |n| Instant::now() >= n) - } - - pub fn fetch_sampling_project_key(&self) -> bool { - self.sampling_project_next_fetch - .map_or(false, |n| Instant::now() >= n) - } -} diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index ac6f5a99b1..aa819c11f0 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -6,15 +6,15 @@ use std::mem; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::Config; -use tokio::time::timeout; +use tokio::time::{timeout, Instant}; use crate::envelope::Envelope; -use crate::services::buffer::common::{ProjectKeyPair, ProjectKeyPairFetch}; +use crate::services::buffer::common::ProjectKeyPair; use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; @@ -126,10 +126,10 @@ impl PolymorphicEnvelopeBuffer { /// Non-ready stacks are deprioritized when they are marked as seen, such that /// the next call to `.peek()` will look at a different stack. This prevents /// head-of-line blocking. - pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) { + pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) { match self { - Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair), - Self::InMemory(buffer) => buffer.mark_seen(project_key_pair), + Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch), + Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch), } } @@ -251,7 +251,7 @@ where /// If the envelope stack does not exist, a new stack is pushed to the priority queue. /// The priority of the stack is updated with the envelope's received_at time. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { - let received_at = envelope.meta().start_time(); + let received_at = envelope.meta().start_time().into(); let project_key_pair = ProjectKeyPair::from_envelope(&envelope); if let Some(( QueueItem { @@ -290,7 +290,11 @@ where key: stack_key, value: stack, }, - Priority { readiness, .. }, + Priority { + readiness, + next_project_fetch: next_peek, + .. + }, )) = self.priority_queue.peek_mut() else { return Ok(Peek::Empty); @@ -301,9 +305,7 @@ where Ok(match (stack.peek().await?, ready) { (None, _) => Peek::Empty, (Some(envelope), true) => Peek::Ready(envelope), - (Some(envelope), false) => { - Peek::NotReady(*stack_key, readiness.should_fetch(), envelope) - } + (Some(envelope), false) => Peek::NotReady(*stack_key, *next_peek, envelope), }) } @@ -321,7 +323,7 @@ where let next_received_at = stack .peek() .await? - .map(|next_envelope| next_envelope.meta().start_time()); + .map(|next_envelope| next_envelope.meta().start_time().into()); match next_received_at { None => { @@ -351,24 +353,18 @@ where pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; if let Some(project_key_pairs) = self.stacks_by_project.get(project) { - // If we are marking a project as ready, we don't want to have a next fetch time but - // if we mark it as not ready, we want to fetch it as soon as possible. - let next_fetch_time = if is_ready { None } else { Some(Instant::now()) }; - for project_key_pair in project_key_pairs { self.priority_queue .change_priority_by(project_key_pair, |stack| { let mut found = false; - for (subkey, readiness, next_fetch) in [ + for (subkey, readiness) in [ ( project_key_pair.own_key, &mut stack.readiness.own_project_ready, - &mut stack.readiness.own_project_next_fetch, ), ( project_key_pair.sampling_key, &mut stack.readiness.sampling_project_ready, - &mut stack.readiness.sampling_project_next_fetch, ), ] { if subkey == *project { @@ -377,7 +373,6 @@ where changed = true; *readiness = is_ready; } - *next_fetch = next_fetch_time; } } debug_assert!(found); @@ -393,18 +388,12 @@ where /// Non-ready stacks are deprioritized when they are marked as seen, such that /// the next call to `.peek()` will look at a different stack. This prevents /// head-of-line blocking. - pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) { + pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) { self.priority_queue .change_priority_by(project_key_pair, |stack| { - // We update the last peek to prevent ahead of line blocking by prioritizing stacks - // with an older timestamp. - stack.last_peek = Instant::now(); - // We schedule the next fetch for the individual project configurations to avoid - // overloading the project cache with constant requests. {} - // TODO: use a configuration parameter for the delay. - let next_fetch = Instant::now() + Duration::from_secs(30); - stack.readiness.own_project_next_fetch = Some(next_fetch); - stack.readiness.sampling_project_next_fetch = Some(next_fetch); + // We use the next project fetch to debounce project fetching and avoid head of + // line blocking of non-ready stacks. + stack.next_project_fetch = Instant::now() + next_fetch; }); } @@ -430,7 +419,7 @@ where ) -> Result<(), EnvelopeBufferError> { let received_at = envelope .as_ref() - .map_or(Instant::now(), |e| e.meta().start_time()); + .map_or(Instant::now(), |e| e.meta().start_time().into()); let mut stack = self .stack_provider @@ -530,7 +519,7 @@ where pub enum Peek<'a> { Empty, Ready(&'a Envelope), - NotReady(ProjectKeyPair, ProjectKeyPairFetch, &'a Envelope), + NotReady(ProjectKeyPair, Instant, &'a Envelope), } #[derive(Debug)] @@ -563,7 +552,7 @@ impl Eq for QueueItem {} struct Priority { readiness: Readiness, received_at: Instant, - last_peek: Instant, + next_project_fetch: Instant, } impl Priority { @@ -571,7 +560,7 @@ impl Priority { Self { readiness: Readiness::new(), received_at, - last_peek: Instant::now(), + next_project_fetch: Instant::now(), } } } @@ -588,8 +577,8 @@ impl Ord for Priority { // For non-ready stacks, we invert the priority, such that projects that are not // ready and did not receive envelopes recently can be evicted. (false, false) => self - .last_peek - .cmp(&other.last_peek) + .next_project_fetch + .cmp(&other.next_project_fetch) .reverse() .then(self.received_at.cmp(&other.received_at).reverse()), } @@ -613,35 +602,17 @@ impl Eq for Priority {} #[derive(Debug, Clone, Copy)] struct Readiness { own_project_ready: bool, - own_project_next_fetch: Option, sampling_project_ready: bool, - sampling_project_next_fetch: Option, } impl Readiness { fn new() -> Self { - // The initial fetch is set to now, since we want to fetch the configurations immediately. - let now = Instant::now(); Self { own_project_ready: false, - own_project_next_fetch: Some(now), sampling_project_ready: false, - sampling_project_next_fetch: Some(now), } } - fn should_fetch(&self) -> ProjectKeyPairFetch { - // If both projects are ready, we return a fetch state which will never trigger fetching. - if self.ready() { - return ProjectKeyPairFetch::new(None, None); - } - - ProjectKeyPairFetch::new( - self.own_project_next_fetch, - self.sampling_project_next_fetch, - ) - } - fn ready(&self) -> bool { self.own_project_ready && self.sampling_project_ready } @@ -1016,15 +987,13 @@ mod tests { let p1 = Priority { readiness: Readiness { own_project_ready: true, - own_project_next_fetch: None, sampling_project_ready: true, - sampling_project_next_fetch: None, }, received_at: Instant::now(), - last_peek: Instant::now(), + next_project_fetch: Instant::now(), }; let mut p2 = p1.clone(); - p2.last_peek += Duration::from_millis(1); + p2.next_project_fetch += Duration::from_millis(1); // Last peek does not matter because project is ready: assert_eq!(p1.cmp(&p2), Ordering::Equal); @@ -1058,7 +1027,7 @@ mod tests { }; assert_eq!(envelope.event_id(), Some(event_id_1)); - buffer.mark_seen(&stack_key); + buffer.mark_seen(&stack_key, Duration::ZERO); // After mark_seen, event 2 is on top: let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else { @@ -1071,7 +1040,7 @@ mod tests { }; assert_eq!(envelope.event_id(), Some(event_id_2)); - buffer.mark_seen(&stack_key); + buffer.mark_seen(&stack_key, Duration::ZERO); // After another mark_seen, cycle back to event 1: let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else { @@ -1114,56 +1083,4 @@ mod tests { // should be 2. assert_eq!(buffer.stacks_by_project.len(), 2); } - - #[tokio::test] - async fn test_next_fetch() { - let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); - - let project_key_1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); - let event_id_1 = EventId::new(); - let envelope1 = new_envelope(project_key_1, None, Some(event_id_1)); - - buffer.push(envelope1).await.unwrap(); - - // We expect the top envelope to have the next fetch be immediately available since it just - // started. - let peek = buffer.peek().await.unwrap(); - let Peek::NotReady(stack_key, project_key_pair_fetch, envelope) = peek else { - panic!(); - }; - assert_eq!(envelope.event_id(), Some(event_id_1)); - assert!(project_key_pair_fetch.fetch_own_project_key()); - assert!(project_key_pair_fetch.fetch_sampling_project_key()); - - buffer.mark_seen(&stack_key); - - // After mark seen, we expect the next fetch time to be in the future. - let peek = buffer.peek().await.unwrap(); - let Peek::NotReady(stack_key, project_key_pair_fetch, envelope) = peek else { - panic!(); - }; - assert_eq!(envelope.event_id(), Some(event_id_1)); - assert!(!project_key_pair_fetch.fetch_own_project_key()); - assert!(!project_key_pair_fetch.fetch_sampling_project_key()); - - buffer.mark_ready(&stack_key.own_key, true); - - // After marking the project as ready, it means we fetched it, so we don't need to fetch it. - let peek = buffer.peek().await.unwrap(); - let Peek::Ready(envelope) = peek else { - panic!(); - }; - assert_eq!(envelope.event_id(), Some(event_id_1)); - - buffer.mark_ready(&stack_key.own_key, false); - - // After marking the project as not ready, we want to immediately schedule a next fetch. - let peek = buffer.peek().await.unwrap(); - let Peek::NotReady(_, project_key_pair_fetch, envelope) = peek else { - panic!(); - }; - assert_eq!(envelope.event_id(), Some(event_id_1)); - assert!(project_key_pair_fetch.fetch_own_project_key()); - assert!(project_key_pair_fetch.fetch_sampling_project_key()); - } } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 90f8d8ba07..f1d562aaf5 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -12,7 +12,7 @@ use relay_system::SendError; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; use relay_system::{Controller, Shutdown}; use tokio::sync::watch; -use tokio::time::timeout; +use tokio::time::{timeout, Instant}; use crate::envelope::Envelope; use crate::services::buffer::envelope_buffer::Peek; @@ -237,21 +237,24 @@ impl EnvelopeBufferService { self.services.project_cache.send(DequeuedEnvelope(envelope)); self.sleep = Duration::ZERO; // try next pop immediately } - Peek::NotReady(stack_key, project_key_pair_fetch, envelope) => { - relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready, requesting project update"); + Peek::NotReady(stack_key, next_peek, envelope) => { + relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready"); relay_statsd::metric!( counter(RelayCounters::BufferTryPop) += 1, peek_result = "not_ready" ); - let project_key = envelope.meta().public_key(); - if project_key_pair_fetch.fetch_own_project_key() { + + // We want to fetch the configs again, only if some time passed between the last + // peek of this not ready project key pair and the current peek. This is done to + // avoid flooding the project cache with `UpdateProject` messages. + if Instant::now() >= next_peek { + relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); + let project_key = envelope.meta().public_key(); self.services.project_cache.send(UpdateProject(project_key)); - } - match envelope.sampling_key() { - None => {} - Some(sampling_key) if sampling_key == project_key => {} // already sent. - Some(sampling_key) => { - if project_key_pair_fetch.fetch_sampling_project_key() { + match envelope.sampling_key() { + None => {} + Some(sampling_key) if sampling_key == project_key => {} // already sent. + Some(sampling_key) => { self.services .project_cache .send(UpdateProject(sampling_key)); @@ -261,7 +264,7 @@ impl EnvelopeBufferService { // Deprioritize the stack to prevent head-of-line blocking and update the next fetch // time. - buffer.mark_seen(&stack_key); + buffer.mark_seen(&stack_key, DEFAULT_SLEEP); self.sleep = DEFAULT_SLEEP; } } @@ -629,4 +632,30 @@ mod tests { assert_eq!(outcome.category, DataCategory::TransactionIndexed); assert_eq!(outcome.quantity, 1); } + + #[tokio::test] + async fn test_update_project() { + tokio::time::pause(); + let (service, global_tx, project_cache_rx, _) = buffer_service(); + + let addr = service.start(); + + global_tx.send_replace(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); + + let envelope = new_envelope(false, "foo"); + + addr.send(EnvelopeBuffer::Push(envelope.clone())); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // We expect the project update request to be sent. + assert_eq!(project_cache_rx.len(), 1); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // We expect the project update request to be sent again because 1 second passed. + assert_eq!(project_cache_rx.len(), 2); + } } From d1619cf842c735040dc01b96b03a19d163fa387f Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Thu, 19 Sep 2024 09:44:53 +0200 Subject: [PATCH 3/3] FIx --- relay-server/src/services/buffer/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index f1d562aaf5..d02bbf87a3 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -260,11 +260,12 @@ impl EnvelopeBufferService { .send(UpdateProject(sampling_key)); } } + + // Deprioritize the stack to prevent head-of-line blocking and update the next fetch + // time. + buffer.mark_seen(&stack_key, DEFAULT_SLEEP); } - // Deprioritize the stack to prevent head-of-line blocking and update the next fetch - // time. - buffer.mark_seen(&stack_key, DEFAULT_SLEEP); self.sleep = DEFAULT_SLEEP; } }