Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer committed Sep 6, 2024
1 parent d8ef58e commit 1a1fe27
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,45 +137,43 @@ impl EnvelopeBufferService {
}

/// Wait for the configured amount of time and make sure the project cache is ready to receive.
async fn sleep(&mut self) -> Result<(), SendError> {
async fn ready_to_pop(
&mut self,
buffer: &mut PolymorphicEnvelopeBuffer,
) -> Result<(), SendError> {
self.system_ready(buffer).await;
tokio::time::sleep(self.sleep).await;
if let Some(project_cache_ready) = self.project_cache_ready.take() {
project_cache_ready.await?;
}
Ok(())
}

/// Returns `true` when preconditions for unspooling are met.
/// Waits until preconditions for unspooling are met.
///
/// - We should not pop from disk into memory when relay's overall memory capacity
/// has been reached.
/// - We need a valid global config to unspool.
pub fn should_pop(&self, buffer: &PolymorphicEnvelopeBuffer) -> bool {
// We should not unspool from external storage if memory capacity has been reached.
// But if buffer storage is in memory, unspooling can reduce memory usage.
if buffer.is_external() && self.memory_checker.check_memory().is_exceeded() {
relay_log::trace!("Memory exceeded, cannot dequeue");
return false;
}

if !self.global_config_rx.borrow().is_ready() {
relay_log::trace!("Global config not ready");
return false;
async fn system_ready(&self, buffer: &PolymorphicEnvelopeBuffer) {
loop {
// We should not unspool from external storage if memory capacity has been reached.
// But if buffer storage is in memory, unspooling can reduce memory usage.
let memory_ready =
!buffer.is_external() || self.memory_checker.check_memory().has_capacity();
let global_config_ready = self.global_config_rx.borrow().is_ready();

if memory_ready && global_config_ready {
return;
}
tokio::time::sleep(DEFAULT_SLEEP).await;
}

true
}

/// Tries to pop an envelope for a ready project.
async fn try_pop(
&mut self,
buffer: &mut PolymorphicEnvelopeBuffer,
) -> Result<(), EnvelopeBufferError> {
if !self.should_pop(buffer) {
self.sleep = DEFAULT_SLEEP;
return Ok(());
}

relay_log::trace!("EnvelopeBufferService peek");
match buffer.peek().await? {
Peek::Empty => {
Expand Down Expand Up @@ -289,7 +287,7 @@ impl Service for EnvelopeBufferService {
// On the one hand, we might want to prioritize dequeing over enqueing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
Ok(()) = self.sleep() => {
Ok(()) = self.ready_to_pop(&mut buffer) => {
if let Err(e) = self.try_pop(&mut buffer).await {
relay_log::error!(
error = &e as &dyn std::error::Error,
Expand Down

0 comments on commit 1a1fe27

Please sign in to comment.