Skip to content

Commit

Permalink
test: fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Sep 6, 2024
1 parent 42d9cb4 commit d8ef58e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
24 changes: 21 additions & 3 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ pub struct EnvelopeBufferService {
project_cache_ready: Option<Request<()>>,
}

const DEFAULT_SLEEP: Duration = Duration::from_millis(100);
/// The maximum amount of time between evaluations of dequeue conditions.
///
/// Some condition checks are sync (`has_capacity`), so cannot be awaited. The sleep in cancelled
/// whenever a new message or a global config update comes in.
const DEFAULT_SLEEP: Duration = Duration::from_secs(1);

impl EnvelopeBufferService {
/// Creates a memory or disk based [`EnvelopeBufferService`], depending on the given config.
Expand Down Expand Up @@ -150,10 +154,16 @@ impl EnvelopeBufferService {
// We should not unspool from external storage if memory capacity has been reached.
// But if buffer storage is in memory, unspooling can reduce memory usage.
if buffer.is_external() && self.memory_checker.check_memory().is_exceeded() {
relay_log::trace!("Memory exceeded, cannot dequeue");
return false;
}

self.global_config_rx.borrow().is_ready()
if !self.global_config_rx.borrow().is_ready() {
relay_log::trace!("Global config not ready");
return false;
}

true
}

/// Tries to pop an envelope for a ready project.
Expand All @@ -162,6 +172,7 @@ impl EnvelopeBufferService {
buffer: &mut PolymorphicEnvelopeBuffer,
) -> Result<(), EnvelopeBufferError> {
if !self.should_pop(buffer) {
self.sleep = DEFAULT_SLEEP;
return Ok(());
}

Expand Down Expand Up @@ -251,6 +262,7 @@ impl Service for EnvelopeBufferService {
fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) {
let config = self.config.clone();
let memory_checker = self.memory_checker.clone();
let mut global_config_rx = self.global_config_rx.clone();
tokio::spawn(async move {
let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await;

Expand All @@ -267,8 +279,10 @@ impl Service for EnvelopeBufferService {
buffer.initialize().await;

relay_log::info!("EnvelopeBufferService start");
let mut iteration = 0;
loop {
relay_log::trace!("EnvelopeBufferService loop");
iteration += 1;
relay_log::trace!("EnvelopeBufferService loop iteration {iteration}");

tokio::select! {
// NOTE: we do not select a bias here.
Expand All @@ -286,6 +300,10 @@ impl Service for EnvelopeBufferService {
Some(message) = rx.recv() => {
self.handle_message(&mut buffer, message).await;
}
_ = global_config_rx.changed() => {
relay_log::trace!("EnvelopeBufferService received global config");
self.sleep = Duration::ZERO; // Try to pop
}

else => break,
}
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def test_query_retry(failure_type, mini_sentry, relay):

@mini_sentry.app.endpoint("get_project_config")
def get_project_config():
if flask_request.json.get("global") is True:
return original_endpoint()

nonlocal retry_count
retry_count += 1
print("RETRY", retry_count)
Expand Down

0 comments on commit d8ef58e

Please sign in to comment.