diff --git a/src/lib.rs b/src/lib.rs index 0797789..5b173e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ use std::{ sync::atomic::{AtomicBool, Ordering}, time::{Duration, SystemTime}, }; -use tokio::sync::RwLock; +use tokio::{sync::RwLock, task::JoinHandle}; use tracing::{debug, error}; type DurationMillis = u64; @@ -35,6 +35,10 @@ const MIN_CONFLICT_BACKOFF_TIME: DurationFloat = 0.1; const MAX_CONFLICT_BACKOFF_TIME: DurationFloat = 5.0; const CONFLICT_BACKOFF_MULT: DurationFloat = 2.0; +const MIN_WATCHER_BACKOFF_TIME: DurationFloat = 1.0; +const MAX_WATCHER_BACKOFF_TIME: DurationFloat = 30.0; +const WATCHER_BACKOFF_MULT: DurationFloat = 2.0; + /// Represents `kube-lease` specific errors. #[derive(thiserror::Error, Debug)] pub enum LeaseManagerError { @@ -152,7 +156,39 @@ impl LeaseManager { }) } - pub async fn watch(&self) /* -> (channel, handler) */ {} + pub async fn watch(self) -> (tokio::sync::watch::Receiver, JoinHandle<()>) { + let (sender, receiver) = tokio::sync::watch::channel(self.is_leader.load(Ordering::Relaxed)); + let watcher = async move { + let mut backoff = + BackoffSleep::new(MIN_WATCHER_BACKOFF_TIME, MAX_WATCHER_BACKOFF_TIME, WATCHER_BACKOFF_MULT); + loop { + if sender.is_closed() { + return; + } + match self.changed().await { + Ok(state) => { + let result = sender.send(state); + if result.is_err() { + return; + } + backoff.reset(); + } + Err(e) => { + error!(error = %e, "LeaseManager watcher error"); + if sender.is_closed() { + return; + } else { + backoff.sleep().await; + } + } + } + } + }; + + let handler = tokio::spawn(watcher); + + (receiver, handler) + } /// Try to lock lease and renew it periodically to prevent expiration. /// @@ -171,7 +207,7 @@ impl LeaseManager { // Is leader changed iteration? let is_holder = self.state.read().await.is_holder(&self.params.identity); if self.is_leader.load(Ordering::Acquire) != is_holder { - debug!(identity = %self.params.identity, is_leader = is_holder, "lease lock state has been changed"); + debug!(identity = %self.params.identity, is_leader = %is_holder, "lease lock state has been changed"); self.is_leader.store(is_holder, Ordering::Release); return Ok(is_holder); @@ -232,10 +268,8 @@ impl LeaseManager { } else if self.is_locked().await && !self.is_expired().await { // It's locked by someone else and lock is actual. // Sleep up to the expiration time of the lock. - debug!( - identity = %self.params.identity, holder = self.state.read().await.holder.as_ref().unwrap(), - "lease is actually locked by other identity" - ); + let holder = self.holder().await.unwrap(); + debug!(identity = %self.params.identity, %holder,"lease is actually locked by other identity"); tokio::time::sleep(self.grace_sleep_duration(self.expiry().await, 0)).await; Ok(()) } else { @@ -263,6 +297,10 @@ impl LeaseManager { self.state.read().await.expiry } + async fn holder(&self) -> Option { + self.state.read().await.holder.clone() + } + fn grace_sleep_duration(&self, expiry: SystemTime, grace: DurationSeconds) -> Duration { let grace = Duration::from_secs(grace); expiry