Skip to content

Commit

Permalink
feat(spooler): Implement next fetch behavior in the spooler v2 (#4044)
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Sep 19, 2024
1 parent 7075a7b commit 84fb625
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 45 deletions.
68 changes: 37 additions & 31 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::mem;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;

use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use tokio::time::timeout;
use tokio::time::{timeout, Instant};

use crate::envelope::Envelope;
use crate::services::buffer::common::ProjectKeyPair;
Expand Down Expand Up @@ -126,10 +126,10 @@ impl PolymorphicEnvelopeBuffer {
/// Non-ready stacks are deprioritized when they are marked as seen, such that
/// the next call to `.peek()` will look at a different stack. This prevents
/// head-of-line blocking.
pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) {
pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
match self {
Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair),
Self::InMemory(buffer) => buffer.mark_seen(project_key_pair),
Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
}
}

Expand Down Expand Up @@ -251,7 +251,7 @@ where
/// If the envelope stack does not exist, a new stack is pushed to the priority queue.
/// The priority of the stack is updated with the envelope's received_at time.
pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
let received_at = envelope.meta().start_time();
let received_at = envelope.meta().start_time().into();
let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
if let Some((
QueueItem {
Expand Down Expand Up @@ -290,7 +290,11 @@ where
key: stack_key,
value: stack,
},
Priority { readiness, .. },
Priority {
readiness,
next_project_fetch: next_peek,
..
},
)) = self.priority_queue.peek_mut()
else {
return Ok(Peek::Empty);
Expand All @@ -301,7 +305,7 @@ where
Ok(match (stack.peek().await?, ready) {
(None, _) => Peek::Empty,
(Some(envelope), true) => Peek::Ready(envelope),
(Some(envelope), false) => Peek::NotReady(*stack_key, envelope),
(Some(envelope), false) => Peek::NotReady(*stack_key, *next_peek, envelope),
})
}

Expand All @@ -319,7 +323,7 @@ where
let next_received_at = stack
.peek()
.await?
.map(|next_envelope| next_envelope.meta().start_time());
.map(|next_envelope| next_envelope.meta().start_time().into());

match next_received_at {
None => {
Expand Down Expand Up @@ -348,8 +352,8 @@ where
/// Returns `true` if at least one priority was changed.
pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
let mut changed = false;
if let Some(project_key_pair) = self.stacks_by_project.get(project) {
for project_key_pair in project_key_pair {
if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
for project_key_pair in project_key_pairs {
self.priority_queue
.change_priority_by(project_key_pair, |stack| {
let mut found = false;
Expand Down Expand Up @@ -384,10 +388,12 @@ where
/// Non-ready stacks are deprioritized when they are marked as seen, such that
/// the next call to `.peek()` will look at a different stack. This prevents
/// head-of-line blocking.
pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) {
pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
self.priority_queue
.change_priority_by(project_key_pair, |stack| {
stack.last_peek = Instant::now();
// We use the next project fetch to debounce project fetching and avoid head of
// line blocking of non-ready stacks.
stack.next_project_fetch = Instant::now() + next_fetch;
});
}

Expand All @@ -413,7 +419,7 @@ where
) -> Result<(), EnvelopeBufferError> {
let received_at = envelope
.as_ref()
.map_or(Instant::now(), |e| e.meta().start_time());
.map_or(Instant::now(), |e| e.meta().start_time().into());

let mut stack = self
.stack_provider
Expand Down Expand Up @@ -513,7 +519,7 @@ where
pub enum Peek<'a> {
Empty,
Ready(&'a Envelope),
NotReady(ProjectKeyPair, &'a Envelope),
NotReady(ProjectKeyPair, Instant, &'a Envelope),
}

#[derive(Debug)]
Expand Down Expand Up @@ -546,15 +552,15 @@ impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
struct Priority {
readiness: Readiness,
received_at: Instant,
last_peek: Instant,
next_project_fetch: Instant,
}

impl Priority {
fn new(received_at: Instant) -> Self {
Self {
readiness: Readiness::new(),
received_at,
last_peek: Instant::now(),
next_project_fetch: Instant::now(),
}
}
}
Expand All @@ -571,8 +577,8 @@ impl Ord for Priority {
// For non-ready stacks, we invert the priority, such that projects that are not
// ready and did not receive envelopes recently can be evicted.
(false, false) => self
.last_peek
.cmp(&other.last_peek)
.next_project_fetch
.cmp(&other.next_project_fetch)
.reverse()
.then(self.received_at.cmp(&other.received_at).reverse()),
}
Expand Down Expand Up @@ -638,19 +644,19 @@ mod tests {
fn envelope(&self) -> Option<&Envelope> {
match self {
Peek::Empty => None,
Peek::Ready(envelope) | Peek::NotReady(_, envelope) => Some(envelope),
Peek::Ready(envelope) | Peek::NotReady(_, _, envelope) => Some(envelope),
}
}
}

fn new_envelope(
project_key: ProjectKey,
own_key: ProjectKey,
sampling_key: Option<ProjectKey>,
event_id: Option<EventId>,
) -> Box<Envelope> {
let mut envelope = Envelope::from_request(
None,
RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()),
RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
);
if let Some(sampling_key) = sampling_key {
envelope.set_dsc(DynamicSamplingContext {
Expand Down Expand Up @@ -984,10 +990,10 @@ mod tests {
sampling_project_ready: true,
},
received_at: Instant::now(),
last_peek: Instant::now(),
next_project_fetch: Instant::now(),
};
let mut p2 = p1.clone();
p2.last_peek += Duration::from_millis(1);
p2.next_project_fetch += Duration::from_millis(1);

// Last peek does not matter because project is ready:
assert_eq!(p1.cmp(&p2), Ordering::Equal);
Expand All @@ -1010,34 +1016,34 @@ mod tests {
buffer.push(envelope2).await.unwrap();

// event_id_1 is first element:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));

// Second peek returns same element:
let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else {
let Peek::NotReady(stack_key, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));

buffer.mark_seen(&stack_key);
buffer.mark_seen(&stack_key, Duration::ZERO);

// After mark_seen, event 2 is on top:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_2));

let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else {
let Peek::NotReady(stack_key, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_2));

buffer.mark_seen(&stack_key);
buffer.mark_seen(&stack_key, Duration::ZERO);

// After another mark_seen, cycle back to event 1:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));
Expand Down
64 changes: 50 additions & 14 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use relay_system::{Controller, Shutdown};
use tokio::sync::watch;
use tokio::time::timeout;
use tokio::time::{timeout, Instant};

use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::Peek;
Expand Down Expand Up @@ -237,25 +237,35 @@ impl EnvelopeBufferService {
self.services.project_cache.send(DequeuedEnvelope(envelope));
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, envelope) => {
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready, requesting project update");
Peek::NotReady(stack_key, next_peek, envelope) => {
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "not_ready"
);
let project_key = envelope.meta().public_key();
self.services.project_cache.send(UpdateProject(project_key));
match envelope.sampling_key() {
None => {}
Some(sampling_key) if sampling_key == project_key => {} // already sent.
Some(sampling_key) => {
self.services
.project_cache
.send(UpdateProject(sampling_key));

// We want to fetch the configs again, only if some time passed between the last
// peek of this not ready project key pair and the current peek. This is done to
// avoid flooding the project cache with `UpdateProject` messages.
if Instant::now() >= next_peek {
relay_log::trace!("EnvelopeBufferService: requesting project(s) update");
let project_key = envelope.meta().public_key();
self.services.project_cache.send(UpdateProject(project_key));
match envelope.sampling_key() {
None => {}
Some(sampling_key) if sampling_key == project_key => {} // already sent.
Some(sampling_key) => {
self.services
.project_cache
.send(UpdateProject(sampling_key));
}
}

// Deprioritize the stack to prevent head-of-line blocking and update the next fetch
// time.
buffer.mark_seen(&stack_key, DEFAULT_SLEEP);
}
// deprioritize the stack to prevent head-of-line blocking
buffer.mark_seen(&stack_key);

self.sleep = DEFAULT_SLEEP;
}
}
Expand Down Expand Up @@ -623,4 +633,30 @@ mod tests {
assert_eq!(outcome.category, DataCategory::TransactionIndexed);
assert_eq!(outcome.quantity, 1);
}

#[tokio::test]
async fn test_update_project() {
tokio::time::pause();
let (service, global_tx, project_cache_rx, _) = buffer_service();

let addr = service.start();

global_tx.send_replace(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));

let envelope = new_envelope(false, "foo");

addr.send(EnvelopeBuffer::Push(envelope.clone()));

tokio::time::sleep(Duration::from_secs(1)).await;

// We expect the project update request to be sent.
assert_eq!(project_cache_rx.len(), 1);

tokio::time::sleep(Duration::from_secs(1)).await;

// We expect the project update request to be sent again because 1 second passed.
assert_eq!(project_cache_rx.len(), 2);
}
}

0 comments on commit 84fb625

Please sign in to comment.