Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve integration tests #6

Merged
merged 2 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
name = "kube-lease-manager"
authors = ["Oleksii Karpenko <alexkarpenko@yahoo.com>"]
categories = ["api-bindings", "asynchronous"]
description = "Ergonomic and durable leader election using Kubernetes Lease API."
description = "Ergonomic and reliable leader election using Kubernetes Lease API."
edition = "2021"
rust-version = "1.75"
homepage = "https://github.com/alex-karpenko/kube-lease-manager"
keywords = ["kubernetes", "async", "lease", "leader", "election"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/alex-karpenko/kube-lease-manager"
version = "0.1.4"
version = "0.1.5"
exclude = [
".github/**",
".vscode/**",
Expand Down
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# kube-lease-manager

Ergonomic and durable leader election using Kubernetes Lease API.
Ergonomic and reliable leader election using Kubernetes Lease API.

<p>
<a href="https://github.com/alex-karpenko/kube-lease-manager/actions/workflows/ci.yaml" rel="nofollow"><img src="https://img.shields.io/github/actions/workflow/status/alex-karpenko/kube-lease-manager/ci.yaml?label=ci" alt="CI status"></a>
Expand Down Expand Up @@ -63,19 +63,18 @@ Second makes possible to acquire and release lock when you need it.
The simplest example using first locking approach:
```rust
use kube::Client;
use kube_lease_manager::LeaseManagerBuilder;
use kube_lease_manager::{LeaseManagerBuilder, Result};
use std::time::Duration;

#[tokio::main]
async fn main() {
// Use default Kube client
let client = Client::try_default().await.unwrap();
let client = Client::try_default().await?;
// Create the simplest LeaseManager with reasonable defaults using convenient builder.
// It uses Lease resource called `test-watch-lease`.
let manager = LeaseManagerBuilder::new(client, "test-auto-lease")
let manager = LeaseManagerBuilder::new(client, "test-watch-lease")
.build()
.await
.unwrap();
.await?;

let (mut channel, task) = manager.watch().await;
// Watch on the channel for lock state changes
Expand All @@ -96,7 +95,9 @@ async fn main() {
// Explicitly close the control channel
drop(channel);
// Wait for the finish of the manager and get it back
let _manager = tokio::join!(task).0.unwrap().unwrap();
let _manager = tokio::join!(task).0.unwrap()?;

Ok(())
}
```

Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![deny(unsafe_code)]
//! Ergonomic and durable leader election using Kubernetes Lease API.
//! Ergonomic and reliable leader election using Kubernetes Lease API.
//!
//! `kube-lease-manager` is a high-level helper to facilitate leader election using
//! [Lease Kubernetes resource](https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/lease-v1/).
Expand Down Expand Up @@ -77,7 +77,7 @@
//! [with_field_manager()](LeaseParams::with_field_manager) method or using [`LeaseManagerBuilder`].
//!
//! The next config option is a [`LeaseCreateMode`]
//! which defines the behavior how LeaseManager manages Lease Kubernetes resource during startup.
//! which defines the behavior how [`LeaseManager`] manages Lease Kubernetes resource during startup.
//! The default behavior is [`AutoCreate`](LeaseCreateMode::AutoCreate):
//! create resource if it doesn't exist or use existing one if it's already present.
//!
Expand Down
17 changes: 8 additions & 9 deletions tests/auto.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use kube::Client;
use kube_lease_manager::LeaseManagerBuilder;
use kube_lease_manager::{LeaseManagerBuilder, Result};
use std::time::Duration;

#[tokio::test]
#[ignore = "uses k8s current-context"]
async fn auto() {
async fn auto() -> Result<()> {
tracing_subscriber::fmt::init();
// Use default Kube client
let client = Client::try_default().await.unwrap();
// Use default Kube client.
let client = Client::try_default().await?;
// Create the simplest LeaseManager with reasonable defaults using convenient builder.
// It uses Lease resource called `test-watch-lease`.
let manager = LeaseManagerBuilder::new(client, "test-auto-lease")
.build()
.await
.unwrap();
let manager = LeaseManagerBuilder::new(client, "test-auto-lease").build().await?;

let (mut channel, task) = manager.watch().await;
// Watch on the channel for lock state changes
Expand All @@ -34,5 +31,7 @@ async fn auto() {
// Explicitly close the control channel
drop(channel);
// Wait for the finish of the manager and get it back
let _manager = tokio::join!(task).0.unwrap().unwrap();
let _manager = tokio::join!(task).0.unwrap()?;

Ok(())
}
161 changes: 161 additions & 0 deletions tests/watch_many_threads.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/// This test creates several OS threads and run `LeaseManager` on the each of them.
/// Each thread creates its own `Tokio` runtime to run "workload" really concurrently,
/// and tries to:
/// - lock lease using `watch()` approach;
/// - run "workload" when got the lock;
/// - release lock by dropping channel;
/// - exit.
///
/// As result, each manager holds lock for some time and publish its index and lock/unlock event
/// to the channel for further analyzing.
///
/// Requirement is that channel contains correct sequence of events.
///
use std::{
sync::mpsc::{self, Sender},
thread,
time::Duration,
};

use k8s_openapi::api::core::v1::Namespace;
use kube::{api::PostParams, Api, Client, Resource};
use kube_lease_manager::{DurationSeconds, LeaseManagerBuilder, Result};
use tracing::{debug, error};

const TEST_NAMESPACE: &str = "kube-lease-integration-test";
const TEST_LEASE_NAME: &str = "watch-many-threads";

const TEST_THREADS: usize = 5;

const TASK_DURATION: DurationSeconds = 3;
const LEASE_DURATION: DurationSeconds = 2;
const LEASE_GRACE: DurationSeconds = 1;

#[derive(Debug, PartialEq, Eq)]
enum Event {
Locked(usize),
Started(usize),
Completed(usize),
Error(usize),
}

#[test]
#[ignore = "uses k8s current-context"]
fn watch_many_threads() -> Result<()> {
tracing_subscriber::fmt::init();

let (tx, rx) = mpsc::channel::<Event>();
let tasks: Vec<_> = (0..TEST_THREADS)
.map(move |index| {
let tx = tx.clone();
thread::spawn(move || run_watch_tread(index, tx))
})
.collect();

tasks.into_iter().for_each(|t| t.join().unwrap().unwrap());

for _ in 0..TEST_THREADS {
let event = rx.recv().unwrap();
assert!(
matches!(event, Event::Locked(_)),
"Incorrect event type: expected `Locked`, but got {event:?}"
);

if let Event::Locked(index) = event {
assert_eq!(
rx.recv().unwrap(),
Event::Started(index),
"Incorrect event type or index: expected `Started({index})`, but got {event:?}"
);
assert_eq!(
rx.recv().unwrap(),
Event::Completed(index),
"Incorrect event type or index: expected `Completed({index})`, but got {event:?}"
);

print!("{index} ");
} else {
unreachable!("Incorrect event type at previous step");
}
}
println!();

Ok(())
}

fn run_watch_tread(index: usize, tx: Sender<Event>) -> Result<()> {
debug!(%index, "Starting");

let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async move {
let client = Client::try_default().await.unwrap();
create_namespace(client.clone(), TEST_NAMESPACE).await.unwrap();

let manager = LeaseManagerBuilder::new(client, TEST_LEASE_NAME)
.with_namespace(TEST_NAMESPACE)
.with_duration(LEASE_DURATION)
.with_grace(LEASE_GRACE)
.build()
.await
.unwrap();
let (mut channel, manager_task) = manager.watch().await;

let mut locked = *channel.borrow_and_update();
loop {
if !locked {
// Waiting for lock
while !locked {
if let Err(err) = channel.changed().await {
error!(%index, error = %err, "can't receive state");
let _ = tx.send(Event::Error(index));
break;
}
locked = *channel.borrow_and_update();
debug!(%index, "Got lock!");
let _ = tx.send(Event::Locked(index));
}
} else {
// Wait for job completion or state change
debug!(%index, "Try to run our job");
let _ = tx.send(Event::Started(index));
tokio::select! {
_ = job(index, TASK_DURATION) => {
debug!(%index, "Success!");
let _ = tx.send(Event::Completed(index));
},
_ = channel.changed() => {
error!(%index, "Unexpectedly lost the lock");
let _ = tx.send(Event::Error(index));
}
}
break;
}
}

drop(channel);
tokio::join!(manager_task).0.unwrap().unwrap();
});

Ok(())
}

async fn create_namespace(client: Client, namespace: &str) -> Result<()> {
// Create namespace
let pp = PostParams::default();
let mut data = Namespace::default();
data.meta_mut().name = Some(String::from(namespace));

let api = Api::<Namespace>::all(client);
let _ = api.create(&pp, &data).await;
Ok(())
}

async fn job(index: usize, duration: DurationSeconds) {
debug!(?index, ?duration, "Job start");
tokio::time::sleep(Duration::from_secs(duration)).await;
debug!(?index, ?duration, "Job finish");
}