Skip to content

Commit

Permalink
Implement watch method
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-karpenko committed Jul 5, 2024
1 parent a514717 commit 9ac0a49
Showing 1 changed file with 45 additions and 7 deletions.
52 changes: 45 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
sync::atomic::{AtomicBool, Ordering},
time::{Duration, SystemTime},
};
use tokio::sync::RwLock;
use tokio::{sync::RwLock, task::JoinHandle};
use tracing::{debug, error};

type DurationMillis = u64;
Expand All @@ -35,6 +35,10 @@ const MIN_CONFLICT_BACKOFF_TIME: DurationFloat = 0.1;
const MAX_CONFLICT_BACKOFF_TIME: DurationFloat = 5.0;
const CONFLICT_BACKOFF_MULT: DurationFloat = 2.0;

const MIN_WATCHER_BACKOFF_TIME: DurationFloat = 1.0;
const MAX_WATCHER_BACKOFF_TIME: DurationFloat = 30.0;
const WATCHER_BACKOFF_MULT: DurationFloat = 2.0;

/// Represents `kube-lease` specific errors.
#[derive(thiserror::Error, Debug)]
pub enum LeaseManagerError {
Expand Down Expand Up @@ -152,7 +156,39 @@ impl LeaseManager {
})
}

pub async fn watch(&self) /* -> (channel, handler) */ {}
pub async fn watch(self) -> (tokio::sync::watch::Receiver<bool>, JoinHandle<()>) {
let (sender, receiver) = tokio::sync::watch::channel(self.is_leader.load(Ordering::Relaxed));
let watcher = async move {
let mut backoff =
BackoffSleep::new(MIN_WATCHER_BACKOFF_TIME, MAX_WATCHER_BACKOFF_TIME, WATCHER_BACKOFF_MULT);
loop {
if sender.is_closed() {
return;
}
match self.changed().await {
Ok(state) => {
let result = sender.send(state);
if result.is_err() {
return;
}
backoff.reset();
}
Err(e) => {
error!(error = %e, "LeaseManager watcher error");
if sender.is_closed() {
return;
} else {
backoff.sleep().await;
}
}
}
}
};

let handler = tokio::spawn(watcher);

(receiver, handler)
}

/// Try to lock lease and renew it periodically to prevent expiration.
///
Expand All @@ -171,7 +207,7 @@ impl LeaseManager {
// Is leader changed iteration?
let is_holder = self.state.read().await.is_holder(&self.params.identity);
if self.is_leader.load(Ordering::Acquire) != is_holder {
debug!(identity = %self.params.identity, is_leader = is_holder, "lease lock state has been changed");
debug!(identity = %self.params.identity, is_leader = %is_holder, "lease lock state has been changed");
self.is_leader.store(is_holder, Ordering::Release);

return Ok(is_holder);
Expand Down Expand Up @@ -232,10 +268,8 @@ impl LeaseManager {
} else if self.is_locked().await && !self.is_expired().await {
// It's locked by someone else and lock is actual.
// Sleep up to the expiration time of the lock.
debug!(
identity = %self.params.identity, holder = self.state.read().await.holder.as_ref().unwrap(),
"lease is actually locked by other identity"
);
let holder = self.holder().await.unwrap();
debug!(identity = %self.params.identity, %holder,"lease is actually locked by other identity");
tokio::time::sleep(self.grace_sleep_duration(self.expiry().await, 0)).await;
Ok(())
} else {
Expand Down Expand Up @@ -263,6 +297,10 @@ impl LeaseManager {
self.state.read().await.expiry
}

async fn holder(&self) -> Option<String> {
self.state.read().await.holder.clone()
}

fn grace_sleep_duration(&self, expiry: SystemTime, grace: DurationSeconds) -> Duration {
let grace = Duration::from_secs(grace);
expiry
Expand Down

0 comments on commit 9ac0a49

Please sign in to comment.