diff --git a/Cargo.toml b/Cargo.toml index eaa4101..0720600 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "kube-lease-manager" authors = ["Oleksii Karpenko "] categories = ["api-bindings", "asynchronous"] -description = "Ergonomic and durable leader election using Kubernetes Lease API." +description = "Ergonomic and reliable leader election using Kubernetes Lease API." edition = "2021" rust-version = "1.75" homepage = "https://github.com/alex-karpenko/kube-lease-manager" @@ -10,7 +10,7 @@ keywords = ["kubernetes", "async", "lease", "leader", "election"] license = "MIT" readme = "README.md" repository = "https://github.com/alex-karpenko/kube-lease-manager" -version = "0.1.4" +version = "0.1.5" exclude = [ ".github/**", ".vscode/**", diff --git a/README.md b/README.md index a2c547c..fc768ef 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # kube-lease-manager -Ergonomic and durable leader election using Kubernetes Lease API. +Ergonomic and reliable leader election using Kubernetes Lease API.

CI status @@ -63,19 +63,18 @@ Second makes possible to acquire and release lock when you need it. The simplest example using first locking approach: ```rust use kube::Client; -use kube_lease_manager::LeaseManagerBuilder; +use kube_lease_manager::{LeaseManagerBuilder, Result}; use std::time::Duration; #[tokio::main] async fn main() { // Use default Kube client - let client = Client::try_default().await.unwrap(); + let client = Client::try_default().await?; // Create the simplest LeaseManager with reasonable defaults using convenient builder. // It uses Lease resource called `test-watch-lease`. - let manager = LeaseManagerBuilder::new(client, "test-auto-lease") + let manager = LeaseManagerBuilder::new(client, "test-watch-lease") .build() - .await - .unwrap(); + .await?; let (mut channel, task) = manager.watch().await; // Watch on the channel for lock state changes @@ -96,7 +95,9 @@ async fn main() { // Explicitly close the control channel drop(channel); // Wait for the finish of the manager and get it back - let _manager = tokio::join!(task).0.unwrap().unwrap(); + let _manager = tokio::join!(task).0.unwrap()?; + + Ok(()) } ``` diff --git a/src/lib.rs b/src/lib.rs index d2ed575..fff19fa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ #![deny(unsafe_code)] -//! Ergonomic and durable leader election using Kubernetes Lease API. +//! Ergonomic and reliable leader election using Kubernetes Lease API. //! //! `kube-lease-manager` is a high-level helper to facilitate leader election using //! [Lease Kubernetes resource](https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/lease-v1/). @@ -77,7 +77,7 @@ //! [with_field_manager()](LeaseParams::with_field_manager) method or using [`LeaseManagerBuilder`]. //! //! The next config option is a [`LeaseCreateMode`] -//! which defines the behavior how LeaseManager manages Lease Kubernetes resource during startup. +//! which defines the behavior how [`LeaseManager`] manages Lease Kubernetes resource during startup. //! The default behavior is [`AutoCreate`](LeaseCreateMode::AutoCreate): //! create resource if it doesn't exist or use existing one if it's already present. //! diff --git a/tests/auto.rs b/tests/auto.rs index d109060..0a7b6a7 100644 --- a/tests/auto.rs +++ b/tests/auto.rs @@ -1,19 +1,16 @@ use kube::Client; -use kube_lease_manager::LeaseManagerBuilder; +use kube_lease_manager::{LeaseManagerBuilder, Result}; use std::time::Duration; #[tokio::test] #[ignore = "uses k8s current-context"] -async fn auto() { +async fn auto() -> Result<()> { tracing_subscriber::fmt::init(); - // Use default Kube client - let client = Client::try_default().await.unwrap(); + // Use default Kube client. + let client = Client::try_default().await?; // Create the simplest LeaseManager with reasonable defaults using convenient builder. // It uses Lease resource called `test-watch-lease`. - let manager = LeaseManagerBuilder::new(client, "test-auto-lease") - .build() - .await - .unwrap(); + let manager = LeaseManagerBuilder::new(client, "test-auto-lease").build().await?; let (mut channel, task) = manager.watch().await; // Watch on the channel for lock state changes @@ -34,5 +31,7 @@ async fn auto() { // Explicitly close the control channel drop(channel); // Wait for the finish of the manager and get it back - let _manager = tokio::join!(task).0.unwrap().unwrap(); + let _manager = tokio::join!(task).0.unwrap()?; + + Ok(()) } diff --git a/tests/watch_many_threads.rs b/tests/watch_many_threads.rs new file mode 100644 index 0000000..3d2db2c --- /dev/null +++ b/tests/watch_many_threads.rs @@ -0,0 +1,161 @@ +/// This test creates several OS threads and run `LeaseManager` on the each of them. +/// Each thread creates its own `Tokio` runtime to run "workload" really concurrently, +/// and tries to: +/// - lock lease using `watch()` approach; +/// - run "workload" when got the lock; +/// - release lock by dropping channel; +/// - exit. +/// +/// As result, each manager holds lock for some time and publish its index and lock/unlock event +/// to the channel for further analyzing. +/// +/// Requirement is that channel contains correct sequence of events. +/// +use std::{ + sync::mpsc::{self, Sender}, + thread, + time::Duration, +}; + +use k8s_openapi::api::core::v1::Namespace; +use kube::{api::PostParams, Api, Client, Resource}; +use kube_lease_manager::{DurationSeconds, LeaseManagerBuilder, Result}; +use tracing::{debug, error}; + +const TEST_NAMESPACE: &str = "kube-lease-integration-test"; +const TEST_LEASE_NAME: &str = "watch-many-threads"; + +const TEST_THREADS: usize = 5; + +const TASK_DURATION: DurationSeconds = 3; +const LEASE_DURATION: DurationSeconds = 2; +const LEASE_GRACE: DurationSeconds = 1; + +#[derive(Debug, PartialEq, Eq)] +enum Event { + Locked(usize), + Started(usize), + Completed(usize), + Error(usize), +} + +#[test] +#[ignore = "uses k8s current-context"] +fn watch_many_threads() -> Result<()> { + tracing_subscriber::fmt::init(); + + let (tx, rx) = mpsc::channel::(); + let tasks: Vec<_> = (0..TEST_THREADS) + .map(move |index| { + let tx = tx.clone(); + thread::spawn(move || run_watch_tread(index, tx)) + }) + .collect(); + + tasks.into_iter().for_each(|t| t.join().unwrap().unwrap()); + + for _ in 0..TEST_THREADS { + let event = rx.recv().unwrap(); + assert!( + matches!(event, Event::Locked(_)), + "Incorrect event type: expected `Locked`, but got {event:?}" + ); + + if let Event::Locked(index) = event { + assert_eq!( + rx.recv().unwrap(), + Event::Started(index), + "Incorrect event type or index: expected `Started({index})`, but got {event:?}" + ); + assert_eq!( + rx.recv().unwrap(), + Event::Completed(index), + "Incorrect event type or index: expected `Completed({index})`, but got {event:?}" + ); + + print!("{index} "); + } else { + unreachable!("Incorrect event type at previous step"); + } + } + println!(); + + Ok(()) +} + +fn run_watch_tread(index: usize, tx: Sender) -> Result<()> { + debug!(%index, "Starting"); + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async move { + let client = Client::try_default().await.unwrap(); + create_namespace(client.clone(), TEST_NAMESPACE).await.unwrap(); + + let manager = LeaseManagerBuilder::new(client, TEST_LEASE_NAME) + .with_namespace(TEST_NAMESPACE) + .with_duration(LEASE_DURATION) + .with_grace(LEASE_GRACE) + .build() + .await + .unwrap(); + let (mut channel, manager_task) = manager.watch().await; + + let mut locked = *channel.borrow_and_update(); + loop { + if !locked { + // Waiting for lock + while !locked { + if let Err(err) = channel.changed().await { + error!(%index, error = %err, "can't receive state"); + let _ = tx.send(Event::Error(index)); + break; + } + locked = *channel.borrow_and_update(); + debug!(%index, "Got lock!"); + let _ = tx.send(Event::Locked(index)); + } + } else { + // Wait for job completion or state change + debug!(%index, "Try to run our job"); + let _ = tx.send(Event::Started(index)); + tokio::select! { + _ = job(index, TASK_DURATION) => { + debug!(%index, "Success!"); + let _ = tx.send(Event::Completed(index)); + }, + _ = channel.changed() => { + error!(%index, "Unexpectedly lost the lock"); + let _ = tx.send(Event::Error(index)); + } + } + break; + } + } + + drop(channel); + tokio::join!(manager_task).0.unwrap().unwrap(); + }); + + Ok(()) +} + +async fn create_namespace(client: Client, namespace: &str) -> Result<()> { + // Create namespace + let pp = PostParams::default(); + let mut data = Namespace::default(); + data.meta_mut().name = Some(String::from(namespace)); + + let api = Api::::all(client); + let _ = api.create(&pp, &data).await; + Ok(()) +} + +async fn job(index: usize, duration: DurationSeconds) { + debug!(?index, ?duration, "Job start"); + tokio::time::sleep(Duration::from_secs(duration)).await; + debug!(?index, ?duration, "Job finish"); +}