diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index d77efb72df..aa819c11f0 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -6,12 +6,12 @@ use std::mem; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::Config; -use tokio::time::timeout; +use tokio::time::{timeout, Instant}; use crate::envelope::Envelope; use crate::services::buffer::common::ProjectKeyPair; @@ -126,10 +126,10 @@ impl PolymorphicEnvelopeBuffer { /// Non-ready stacks are deprioritized when they are marked as seen, such that /// the next call to `.peek()` will look at a different stack. This prevents /// head-of-line blocking. - pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) { + pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) { match self { - Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair), - Self::InMemory(buffer) => buffer.mark_seen(project_key_pair), + Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch), + Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch), } } @@ -251,7 +251,7 @@ where /// If the envelope stack does not exist, a new stack is pushed to the priority queue. /// The priority of the stack is updated with the envelope's received_at time. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { - let received_at = envelope.meta().start_time(); + let received_at = envelope.meta().start_time().into(); let project_key_pair = ProjectKeyPair::from_envelope(&envelope); if let Some(( QueueItem { @@ -290,7 +290,11 @@ where key: stack_key, value: stack, }, - Priority { readiness, .. }, + Priority { + readiness, + next_project_fetch: next_peek, + .. + }, )) = self.priority_queue.peek_mut() else { return Ok(Peek::Empty); @@ -301,7 +305,7 @@ where Ok(match (stack.peek().await?, ready) { (None, _) => Peek::Empty, (Some(envelope), true) => Peek::Ready(envelope), - (Some(envelope), false) => Peek::NotReady(*stack_key, envelope), + (Some(envelope), false) => Peek::NotReady(*stack_key, *next_peek, envelope), }) } @@ -319,7 +323,7 @@ where let next_received_at = stack .peek() .await? - .map(|next_envelope| next_envelope.meta().start_time()); + .map(|next_envelope| next_envelope.meta().start_time().into()); match next_received_at { None => { @@ -348,8 +352,8 @@ where /// Returns `true` if at least one priority was changed. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; - if let Some(project_key_pair) = self.stacks_by_project.get(project) { - for project_key_pair in project_key_pair { + if let Some(project_key_pairs) = self.stacks_by_project.get(project) { + for project_key_pair in project_key_pairs { self.priority_queue .change_priority_by(project_key_pair, |stack| { let mut found = false; @@ -384,10 +388,12 @@ where /// Non-ready stacks are deprioritized when they are marked as seen, such that /// the next call to `.peek()` will look at a different stack. This prevents /// head-of-line blocking. - pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) { + pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) { self.priority_queue .change_priority_by(project_key_pair, |stack| { - stack.last_peek = Instant::now(); + // We use the next project fetch to debounce project fetching and avoid head of + // line blocking of non-ready stacks. + stack.next_project_fetch = Instant::now() + next_fetch; }); } @@ -413,7 +419,7 @@ where ) -> Result<(), EnvelopeBufferError> { let received_at = envelope .as_ref() - .map_or(Instant::now(), |e| e.meta().start_time()); + .map_or(Instant::now(), |e| e.meta().start_time().into()); let mut stack = self .stack_provider @@ -513,7 +519,7 @@ where pub enum Peek<'a> { Empty, Ready(&'a Envelope), - NotReady(ProjectKeyPair, &'a Envelope), + NotReady(ProjectKeyPair, Instant, &'a Envelope), } #[derive(Debug)] @@ -546,7 +552,7 @@ impl Eq for QueueItem {} struct Priority { readiness: Readiness, received_at: Instant, - last_peek: Instant, + next_project_fetch: Instant, } impl Priority { @@ -554,7 +560,7 @@ impl Priority { Self { readiness: Readiness::new(), received_at, - last_peek: Instant::now(), + next_project_fetch: Instant::now(), } } } @@ -571,8 +577,8 @@ impl Ord for Priority { // For non-ready stacks, we invert the priority, such that projects that are not // ready and did not receive envelopes recently can be evicted. (false, false) => self - .last_peek - .cmp(&other.last_peek) + .next_project_fetch + .cmp(&other.next_project_fetch) .reverse() .then(self.received_at.cmp(&other.received_at).reverse()), } @@ -638,19 +644,19 @@ mod tests { fn envelope(&self) -> Option<&Envelope> { match self { Peek::Empty => None, - Peek::Ready(envelope) | Peek::NotReady(_, envelope) => Some(envelope), + Peek::Ready(envelope) | Peek::NotReady(_, _, envelope) => Some(envelope), } } } fn new_envelope( - project_key: ProjectKey, + own_key: ProjectKey, sampling_key: Option, event_id: Option, ) -> Box { let mut envelope = Envelope::from_request( None, - RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()), + RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()), ); if let Some(sampling_key) = sampling_key { envelope.set_dsc(DynamicSamplingContext { @@ -984,10 +990,10 @@ mod tests { sampling_project_ready: true, }, received_at: Instant::now(), - last_peek: Instant::now(), + next_project_fetch: Instant::now(), }; let mut p2 = p1.clone(); - p2.last_peek += Duration::from_millis(1); + p2.next_project_fetch += Duration::from_millis(1); // Last peek does not matter because project is ready: assert_eq!(p1.cmp(&p2), Ordering::Equal); @@ -1010,34 +1016,34 @@ mod tests { buffer.push(envelope2).await.unwrap(); // event_id_1 is first element: - let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else { panic!(); }; assert_eq!(envelope.event_id(), Some(event_id_1)); // Second peek returns same element: - let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else { + let Peek::NotReady(stack_key, _, envelope) = buffer.peek().await.unwrap() else { panic!(); }; assert_eq!(envelope.event_id(), Some(event_id_1)); - buffer.mark_seen(&stack_key); + buffer.mark_seen(&stack_key, Duration::ZERO); // After mark_seen, event 2 is on top: - let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else { panic!(); }; assert_eq!(envelope.event_id(), Some(event_id_2)); - let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else { + let Peek::NotReady(stack_key, _, envelope) = buffer.peek().await.unwrap() else { panic!(); }; assert_eq!(envelope.event_id(), Some(event_id_2)); - buffer.mark_seen(&stack_key); + buffer.mark_seen(&stack_key, Duration::ZERO); // After another mark_seen, cycle back to event 1: - let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else { + let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else { panic!(); }; assert_eq!(envelope.event_id(), Some(event_id_1)); diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index a21d7bd8c7..d02bbf87a3 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -12,7 +12,7 @@ use relay_system::SendError; use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; use relay_system::{Controller, Shutdown}; use tokio::sync::watch; -use tokio::time::timeout; +use tokio::time::{timeout, Instant}; use crate::envelope::Envelope; use crate::services::buffer::envelope_buffer::Peek; @@ -237,25 +237,35 @@ impl EnvelopeBufferService { self.services.project_cache.send(DequeuedEnvelope(envelope)); self.sleep = Duration::ZERO; // try next pop immediately } - Peek::NotReady(stack_key, envelope) => { - relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready, requesting project update"); + Peek::NotReady(stack_key, next_peek, envelope) => { + relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready"); relay_statsd::metric!( counter(RelayCounters::BufferTryPop) += 1, peek_result = "not_ready" ); - let project_key = envelope.meta().public_key(); - self.services.project_cache.send(UpdateProject(project_key)); - match envelope.sampling_key() { - None => {} - Some(sampling_key) if sampling_key == project_key => {} // already sent. - Some(sampling_key) => { - self.services - .project_cache - .send(UpdateProject(sampling_key)); + + // We want to fetch the configs again, only if some time passed between the last + // peek of this not ready project key pair and the current peek. This is done to + // avoid flooding the project cache with `UpdateProject` messages. + if Instant::now() >= next_peek { + relay_log::trace!("EnvelopeBufferService: requesting project(s) update"); + let project_key = envelope.meta().public_key(); + self.services.project_cache.send(UpdateProject(project_key)); + match envelope.sampling_key() { + None => {} + Some(sampling_key) if sampling_key == project_key => {} // already sent. + Some(sampling_key) => { + self.services + .project_cache + .send(UpdateProject(sampling_key)); + } } + + // Deprioritize the stack to prevent head-of-line blocking and update the next fetch + // time. + buffer.mark_seen(&stack_key, DEFAULT_SLEEP); } - // deprioritize the stack to prevent head-of-line blocking - buffer.mark_seen(&stack_key); + self.sleep = DEFAULT_SLEEP; } } @@ -623,4 +633,30 @@ mod tests { assert_eq!(outcome.category, DataCategory::TransactionIndexed); assert_eq!(outcome.quantity, 1); } + + #[tokio::test] + async fn test_update_project() { + tokio::time::pause(); + let (service, global_tx, project_cache_rx, _) = buffer_service(); + + let addr = service.start(); + + global_tx.send_replace(global_config::Status::Ready(Arc::new( + GlobalConfig::default(), + ))); + + let envelope = new_envelope(false, "foo"); + + addr.send(EnvelopeBuffer::Push(envelope.clone())); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // We expect the project update request to be sent. + assert_eq!(project_cache_rx.len(), 1); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // We expect the project update request to be sent again because 1 second passed. + assert_eq!(project_cache_rx.len(), 2); + } }