Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Implement next fetch behavior in the spooler v2 #4044

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 37 additions & 31 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::mem;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;

use hashbrown::HashSet;
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use tokio::time::timeout;
use tokio::time::{timeout, Instant};

use crate::envelope::Envelope;
use crate::services::buffer::common::ProjectKeyPair;
Expand Down Expand Up @@ -126,10 +126,10 @@ impl PolymorphicEnvelopeBuffer {
/// Non-ready stacks are deprioritized when they are marked as seen, such that
/// the next call to `.peek()` will look at a different stack. This prevents
/// head-of-line blocking.
pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) {
pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
match self {
Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair),
Self::InMemory(buffer) => buffer.mark_seen(project_key_pair),
Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
Self::InMemory(buffer) => buffer.mark_seen(project_key_pair, next_fetch),
}
}

Expand Down Expand Up @@ -251,7 +251,7 @@ where
/// If the envelope stack does not exist, a new stack is pushed to the priority queue.
/// The priority of the stack is updated with the envelope's received_at time.
pub async fn push(&mut self, envelope: Box<Envelope>) -> Result<(), EnvelopeBufferError> {
let received_at = envelope.meta().start_time();
let received_at = envelope.meta().start_time().into();
let project_key_pair = ProjectKeyPair::from_envelope(&envelope);
if let Some((
QueueItem {
Expand Down Expand Up @@ -290,7 +290,11 @@ where
key: stack_key,
value: stack,
},
Priority { readiness, .. },
Priority {
readiness,
next_project_fetch: next_peek,
..
},
)) = self.priority_queue.peek_mut()
else {
return Ok(Peek::Empty);
Expand All @@ -301,7 +305,7 @@ where
Ok(match (stack.peek().await?, ready) {
(None, _) => Peek::Empty,
(Some(envelope), true) => Peek::Ready(envelope),
(Some(envelope), false) => Peek::NotReady(*stack_key, envelope),
(Some(envelope), false) => Peek::NotReady(*stack_key, *next_peek, envelope),
})
}

Expand All @@ -319,7 +323,7 @@ where
let next_received_at = stack
.peek()
.await?
.map(|next_envelope| next_envelope.meta().start_time());
.map(|next_envelope| next_envelope.meta().start_time().into());

match next_received_at {
None => {
Expand Down Expand Up @@ -348,8 +352,8 @@ where
/// Returns `true` if at least one priority was changed.
pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool {
let mut changed = false;
if let Some(project_key_pair) = self.stacks_by_project.get(project) {
for project_key_pair in project_key_pair {
if let Some(project_key_pairs) = self.stacks_by_project.get(project) {
for project_key_pair in project_key_pairs {
self.priority_queue
.change_priority_by(project_key_pair, |stack| {
let mut found = false;
Expand Down Expand Up @@ -384,10 +388,12 @@ where
/// Non-ready stacks are deprioritized when they are marked as seen, such that
/// the next call to `.peek()` will look at a different stack. This prevents
/// head-of-line blocking.
pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) {
pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair, next_fetch: Duration) {
self.priority_queue
.change_priority_by(project_key_pair, |stack| {
stack.last_peek = Instant::now();
// We use the next project fetch to debounce project fetching and avoid head of
// line blocking of non-ready stacks.
stack.next_project_fetch = Instant::now() + next_fetch;
});
}

Expand All @@ -413,7 +419,7 @@ where
) -> Result<(), EnvelopeBufferError> {
let received_at = envelope
.as_ref()
.map_or(Instant::now(), |e| e.meta().start_time());
.map_or(Instant::now(), |e| e.meta().start_time().into());

let mut stack = self
.stack_provider
Expand Down Expand Up @@ -513,7 +519,7 @@ where
pub enum Peek<'a> {
Empty,
Ready(&'a Envelope),
NotReady(ProjectKeyPair, &'a Envelope),
NotReady(ProjectKeyPair, Instant, &'a Envelope),
}

#[derive(Debug)]
Expand Down Expand Up @@ -546,15 +552,15 @@ impl<K: PartialEq, V> Eq for QueueItem<K, V> {}
struct Priority {
readiness: Readiness,
received_at: Instant,
last_peek: Instant,
next_project_fetch: Instant,
}

impl Priority {
fn new(received_at: Instant) -> Self {
Self {
readiness: Readiness::new(),
received_at,
last_peek: Instant::now(),
next_project_fetch: Instant::now(),
}
}
}
Expand All @@ -571,8 +577,8 @@ impl Ord for Priority {
// For non-ready stacks, we invert the priority, such that projects that are not
// ready and did not receive envelopes recently can be evicted.
(false, false) => self
.last_peek
.cmp(&other.last_peek)
.next_project_fetch
.cmp(&other.next_project_fetch)
.reverse()
.then(self.received_at.cmp(&other.received_at).reverse()),
}
Expand Down Expand Up @@ -638,19 +644,19 @@ mod tests {
fn envelope(&self) -> Option<&Envelope> {
match self {
Peek::Empty => None,
Peek::Ready(envelope) | Peek::NotReady(_, envelope) => Some(envelope),
Peek::Ready(envelope) | Peek::NotReady(_, _, envelope) => Some(envelope),
}
}
}

fn new_envelope(
project_key: ProjectKey,
own_key: ProjectKey,
sampling_key: Option<ProjectKey>,
event_id: Option<EventId>,
) -> Box<Envelope> {
let mut envelope = Envelope::from_request(
None,
RequestMeta::new(Dsn::from_str(&format!("http://{project_key}@localhost/1")).unwrap()),
RequestMeta::new(Dsn::from_str(&format!("http://{own_key}@localhost/1")).unwrap()),
);
if let Some(sampling_key) = sampling_key {
envelope.set_dsc(DynamicSamplingContext {
Expand Down Expand Up @@ -984,10 +990,10 @@ mod tests {
sampling_project_ready: true,
},
received_at: Instant::now(),
last_peek: Instant::now(),
next_project_fetch: Instant::now(),
};
let mut p2 = p1.clone();
p2.last_peek += Duration::from_millis(1);
p2.next_project_fetch += Duration::from_millis(1);

// Last peek does not matter because project is ready:
assert_eq!(p1.cmp(&p2), Ordering::Equal);
Expand All @@ -1010,34 +1016,34 @@ mod tests {
buffer.push(envelope2).await.unwrap();

// event_id_1 is first element:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));

// Second peek returns same element:
let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else {
let Peek::NotReady(stack_key, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));

buffer.mark_seen(&stack_key);
buffer.mark_seen(&stack_key, Duration::ZERO);

// After mark_seen, event 2 is on top:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_2));

let Peek::NotReady(stack_key, envelope) = buffer.peek().await.unwrap() else {
let Peek::NotReady(stack_key, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_2));

buffer.mark_seen(&stack_key);
buffer.mark_seen(&stack_key, Duration::ZERO);

// After another mark_seen, cycle back to event 1:
let Peek::NotReady(_, envelope) = buffer.peek().await.unwrap() else {
let Peek::NotReady(_, _, envelope) = buffer.peek().await.unwrap() else {
panic!();
};
assert_eq!(envelope.event_id(), Some(event_id_1));
Expand Down
64 changes: 50 additions & 14 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use relay_system::{Controller, Shutdown};
use tokio::sync::watch;
use tokio::time::timeout;
use tokio::time::{timeout, Instant};

use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::Peek;
Expand Down Expand Up @@ -237,25 +237,35 @@ impl EnvelopeBufferService {
self.services.project_cache.send(DequeuedEnvelope(envelope));
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, envelope) => {
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready, requesting project update");
Peek::NotReady(stack_key, next_peek, envelope) => {
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "not_ready"
);
let project_key = envelope.meta().public_key();
self.services.project_cache.send(UpdateProject(project_key));
match envelope.sampling_key() {
None => {}
Some(sampling_key) if sampling_key == project_key => {} // already sent.
Some(sampling_key) => {
self.services
.project_cache
.send(UpdateProject(sampling_key));

// We want to fetch the configs again, only if some time passed between the last
// peek of this not ready project key pair and the current peek. This is done to
// avoid flooding the project cache with `UpdateProject` messages.
if Instant::now() >= next_peek {
relay_log::trace!("EnvelopeBufferService: requesting project(s) update");
let project_key = envelope.meta().public_key();
self.services.project_cache.send(UpdateProject(project_key));
match envelope.sampling_key() {
None => {}
Some(sampling_key) if sampling_key == project_key => {} // already sent.
Some(sampling_key) => {
self.services
.project_cache
.send(UpdateProject(sampling_key));
}
}

// Deprioritize the stack to prevent head-of-line blocking and update the next fetch
// time.
buffer.mark_seen(&stack_key, DEFAULT_SLEEP);
}
// deprioritize the stack to prevent head-of-line blocking
buffer.mark_seen(&stack_key);

self.sleep = DEFAULT_SLEEP;
}
}
Expand Down Expand Up @@ -623,4 +633,30 @@ mod tests {
assert_eq!(outcome.category, DataCategory::TransactionIndexed);
assert_eq!(outcome.quantity, 1);
}

#[tokio::test]
async fn test_update_project() {
tokio::time::pause();
let (service, global_tx, project_cache_rx, _) = buffer_service();

let addr = service.start();

global_tx.send_replace(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));

let envelope = new_envelope(false, "foo");

addr.send(EnvelopeBuffer::Push(envelope.clone()));

tokio::time::sleep(Duration::from_secs(1)).await;

// We expect the project update request to be sent.
assert_eq!(project_cache_rx.len(), 1);

tokio::time::sleep(Duration::from_secs(1)).await;

// We expect the project update request to be sent again because 1 second passed.
assert_eq!(project_cache_rx.len(), 2);
}
}
Loading