Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Sep 17, 2024
1 parent f1969a8 commit e7665bc
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
6 changes: 3 additions & 3 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub struct EnvelopeBufferService {
services: Services,
has_capacity: Arc<AtomicBool>,
sleep: Duration,
project_cache_ready: Arc<Waiter>,
project_cache_ready: Waiter,
}

/// The maximum amount of time between evaluations of dequeue conditions.
Expand All @@ -129,7 +129,7 @@ impl EnvelopeBufferService {
memory_checker: MemoryChecker,
global_config_rx: watch::Receiver<global_config::Status>,
services: Services,
project_cache_ready: Arc<Waiter>,
project_cache_ready: Waiter,
) -> Option<Self> {
config.spool_v2().then(|| Self {
config,
Expand Down Expand Up @@ -449,7 +449,7 @@ mod tests {
watch::Sender<global_config::Status>,
mpsc::UnboundedReceiver<ProjectCache>,
mpsc::UnboundedReceiver<TrackOutcome>,
Arc<Waiter>,
Waiter,
) {
let config = Arc::new(
Config::from_json_value(serde_json::json!({
Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ struct ProjectCacheBroker {
/// Status of the global configuration, used to determine readiness for processing.
global_config: GlobalConfigStatus,
/// Waiter signaling whether the project cache is ready to accept a new envelope.
project_cache_ready: Arc<Waiter>,
project_cache_ready: Waiter,
}

#[derive(Debug)]
Expand Down Expand Up @@ -1341,7 +1341,7 @@ pub struct ProjectCacheService {
services: Services,
global_config_rx: watch::Receiver<global_config::Status>,
redis: Option<RedisPool>,
project_cache_ready: Arc<Waiter>,
project_cache_ready: Waiter,
}

impl ProjectCacheService {
Expand All @@ -1352,7 +1352,7 @@ impl ProjectCacheService {
services: Services,
global_config_rx: watch::Receiver<global_config::Status>,
redis: Option<RedisPool>,
project_cache_ready: Arc<Waiter>,
project_cache_ready: Waiter,
) -> Self {
Self {
config,
Expand Down
39 changes: 23 additions & 16 deletions relay-server/src/utils/waiter.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,45 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use tokio::sync::Notify;

/// Construct used to wait on a boolean variable only if it's set to `true`.
/// Struct holding both the boolean state and the notifier.
#[derive(Debug)]
pub struct Waiter {
wait: RwLock<AtomicBool>,
struct Inner {
wait: AtomicBool,
notify: Notify,
}

/// Construct used to wait on a boolean variable only if it's set to `true`.
#[derive(Debug, Clone)]
pub struct Waiter {
inner: Arc<Inner>,
}

impl Waiter {
pub fn new(wait: bool) -> Arc<Self> {
Arc::new(Self {
wait: RwLock::new(AtomicBool::new(wait)),
notify: Notify::new(),
})
pub fn new(wait: bool) -> Self {
Self {
inner: Arc::new(Inner {
wait: AtomicBool::new(wait),
notify: Notify::new(),
}),
}
}

pub fn set_wait(&self, wait: bool) {
self.wait.write().unwrap().store(wait, Ordering::SeqCst);
self.inner.wait.store(wait, Ordering::SeqCst);

// If we are not waiting anymore, we want to notify all the subscribed futures that they
// are ready to try and proceed.
// If we are not waiting anymore, notify all the subscribed futures
if !wait {
self.notify.notify_waiters();
self.inner.notify.notify_waiters();
}
}

pub async fn try_wait(&self) {
if !self.wait.read().unwrap().load(Ordering::Relaxed) {
if !self.inner.wait.load(Ordering::Relaxed) {
return;
}

// If we have to wait, we will suspend on waiting for a notification.
self.notify.notified().await;
// Suspend on waiting for a notification if waiting is required
self.inner.notify.notified().await;
}
}

0 comments on commit e7665bc

Please sign in to comment.