-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
141e488
commit 7d1f064
Showing
5 changed files
with
180 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/// This test creates several OS threads and run `LeaseManager` on the each of them. | ||
/// Each thread creates its own `Tokio` runtime to run "workload" really concurrently, | ||
/// and tries to: | ||
/// - lock lease using `watch()` approach; | ||
/// - run "workload" when got the lock; | ||
/// - release lock by dropping channel; | ||
/// - exit. | ||
/// As result, each manager holds lock for some time and publish its index and lock/unlock event | ||
/// to the channel for further analyzing. | ||
/// | ||
/// Requirement is that channel contains correct sequence of events. | ||
/// | ||
use std::{ | ||
sync::mpsc::{self, Sender}, | ||
thread, | ||
time::Duration, | ||
}; | ||
|
||
use k8s_openapi::api::core::v1::Namespace; | ||
use kube::{api::PostParams, Api, Client, Resource}; | ||
use kube_lease_manager::{DurationSeconds, LeaseManagerBuilder, Result}; | ||
use tracing::{debug, error}; | ||
|
||
const TEST_NAMESPACE: &str = "kube-lease-integration-test"; | ||
const TEST_LEASE_NAME: &str = "watch-many-threads"; | ||
|
||
const TEST_THREADS: usize = 5; | ||
|
||
const TASK_DURATION: DurationSeconds = 3; | ||
const LEASE_DURATION: DurationSeconds = 2; | ||
const LEASE_GRACE: DurationSeconds = 1; | ||
|
||
#[derive(Debug, PartialEq, Eq)] | ||
enum Event { | ||
Locked(usize), | ||
Started(usize), | ||
Completed(usize), | ||
Error(usize), | ||
} | ||
|
||
#[test] | ||
#[ignore = "uses k8s current-context"] | ||
fn watch_many_threads() -> Result<()> { | ||
tracing_subscriber::fmt::init(); | ||
|
||
let (tx, rx) = mpsc::channel::<Event>(); | ||
let tasks: Vec<_> = (0..TEST_THREADS) | ||
.map(move |index| { | ||
let tx = tx.clone(); | ||
thread::spawn(move || run_watch_tread(index, tx)) | ||
}) | ||
.collect(); | ||
|
||
tasks.into_iter().for_each(|t| t.join().unwrap().unwrap()); | ||
|
||
for _ in 0..TEST_THREADS { | ||
let event = rx.recv().unwrap(); | ||
assert!( | ||
matches!(event, Event::Locked(_)), | ||
"Incorrect event type: expected `Locked`, but got {event:?}" | ||
); | ||
|
||
if let Event::Locked(index) = event { | ||
assert_eq!( | ||
rx.recv().unwrap(), | ||
Event::Started(index), | ||
"Incorrect event type or index: expected `Started({index})`, but got {event:?}" | ||
); | ||
assert_eq!( | ||
rx.recv().unwrap(), | ||
Event::Completed(index), | ||
"Incorrect event type or index: expected `Completed({index})`, but got {event:?}" | ||
); | ||
|
||
print!("{index} "); | ||
} else { | ||
unreachable!("Incorrect event type at previous step"); | ||
} | ||
} | ||
println!(""); | ||
|
||
Ok(()) | ||
} | ||
|
||
fn run_watch_tread(index: usize, tx: Sender<Event>) -> Result<()> { | ||
debug!(%index, "Starting"); | ||
|
||
let rt = tokio::runtime::Builder::new_multi_thread() | ||
.enable_all() | ||
.build() | ||
.unwrap(); | ||
|
||
rt.block_on(async move { | ||
let client = Client::try_default().await.unwrap(); | ||
create_namespace(client.clone(), TEST_NAMESPACE).await.unwrap(); | ||
|
||
let manager = LeaseManagerBuilder::new(client, TEST_LEASE_NAME) | ||
.with_namespace(TEST_NAMESPACE) | ||
.with_duration(LEASE_DURATION) | ||
.with_grace(LEASE_GRACE) | ||
.build() | ||
.await | ||
.unwrap(); | ||
let (mut channel, manager_task) = manager.watch().await; | ||
|
||
let mut locked = *channel.borrow_and_update(); | ||
loop { | ||
if !locked { | ||
// Waiting for lock | ||
while !locked { | ||
if let Err(err) = channel.changed().await { | ||
error!(%index, error = %err, "can't receive state"); | ||
let _ = tx.send(Event::Error(index)); | ||
break; | ||
} | ||
locked = *channel.borrow_and_update(); | ||
debug!(%index, "Got lock!"); | ||
let _ = tx.send(Event::Locked(index)); | ||
} | ||
} else { | ||
// Wait for job completion or state change | ||
debug!(%index, "Try to run our job"); | ||
let _ = tx.send(Event::Started(index)); | ||
tokio::select! { | ||
_ = job(index, TASK_DURATION) => { | ||
debug!(%index, "Success!"); | ||
let _ = tx.send(Event::Completed(index)); | ||
}, | ||
_ = channel.changed() => { | ||
error!(%index, "Unexpectedly lost the lock"); | ||
let _ = tx.send(Event::Error(index)); | ||
} | ||
} | ||
break; | ||
} | ||
} | ||
|
||
drop(channel); | ||
tokio::join!(manager_task).0.unwrap().unwrap(); | ||
}); | ||
|
||
Ok(()) | ||
} | ||
|
||
async fn create_namespace(client: Client, namespace: &str) -> Result<()> { | ||
// Create namespace | ||
let pp = PostParams::default(); | ||
let mut data = Namespace::default(); | ||
data.meta_mut().name = Some(String::from(namespace)); | ||
|
||
let api = Api::<Namespace>::all(client); | ||
let _ = api.create(&pp, &data).await; | ||
Ok(()) | ||
} | ||
|
||
async fn job(index: usize, duration: DurationSeconds) { | ||
debug!(?index, ?duration, "Job start"); | ||
tokio::time::sleep(Duration::from_secs(duration)).await; | ||
debug!(?index, ?duration, "Job finish"); | ||
} |