From e6314658da136822c8a9a49bcfaa0f837816bb3f Mon Sep 17 00:00:00 2001 From: Geoffrey Mureithi <95377562+geofmureithi@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:44:39 +0300 Subject: [PATCH] fix: handle only the latest waker (#481) * fix: handle only the latest waker * lint: clippy --- packages/apalis-core/src/worker/mod.rs | 33 +++++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index bb07621..0e025dc 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -302,7 +302,7 @@ impl Worker> { let ctx = Context { running: Arc::default(), task_count: Arc::default(), - wakers: Arc::default(), + waker: Arc::default(), shutdown: self.state.shutdown, event_handler: self.state.event_handler.clone(), is_ready: Arc::default(), @@ -400,7 +400,7 @@ impl Future for Runnable { #[derive(Clone, Default)] pub struct Context { task_count: Arc, - wakers: Arc>>, + waker: Arc>>, running: Arc, shutdown: Option, event_handler: EventHandler, @@ -469,9 +469,9 @@ impl Context { } pub(crate) fn wake(&self) { - if let Ok(mut wakers) = self.wakers.lock() { - for waker in wakers.drain(..) { - waker.wake(); + if let Ok(waker) = self.waker.lock() { + if let Some(waker) = &*waker { + waker.wake_by_ref(); } } } @@ -501,13 +501,26 @@ impl Context { } fn add_waker(&self, cx: &mut TaskCtx<'_>) { - if let Ok(mut wakers) = self.wakers.lock() { - if !wakers.iter().any(|w| w.will_wake(cx.waker())) { - wakers.push(cx.waker().clone()); + if let Ok(mut waker_guard) = self.waker.lock() { + if waker_guard + .as_ref() + .map_or(true, |stored_waker| !stored_waker.will_wake(cx.waker())) + { + *waker_guard = Some(cx.waker().clone()); } } } + /// Checks if the stored waker matches the current one. + fn has_recent_waker(&self, cx: &TaskCtx<'_>) -> bool { + if let Ok(waker_guard) = self.waker.lock() { + if let Some(stored_waker) = &*waker_guard { + return stored_waker.will_wake(cx.waker()); + } + } + false + } + /// Returns if the worker is ready to consume new tasks pub fn is_ready(&self) -> bool { self.is_ready.load(Ordering::Acquire) && !self.is_shutting_down() @@ -522,7 +535,9 @@ impl Future for Context { if self.is_shutting_down() && task_count == 0 { Poll::Ready(()) } else { - self.add_waker(cx); + if !self.has_recent_waker(cx) { + self.add_waker(cx); + } Poll::Pending } }