diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..e2a8b01 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[env] +K8S_OPENAPI_ENABLED_VERSION = "1.26" diff --git a/.github/workflows/audit.yaml b/.github/workflows/audit.yaml new file mode 100644 index 0000000..d825b89 --- /dev/null +++ b/.github/workflows/audit.yaml @@ -0,0 +1,18 @@ +name: Security audit +on: + workflow_dispatch: + schedule: + - cron: "17 5 * * 5" + push: + branches: + - main + paths: + - "**/Cargo.toml" + pull_request: + +jobs: + security-audit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: EmbarkStudios/cargo-deny-action@v1 diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..6b41040 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,58 @@ +name: Continuous integration +on: + workflow_dispatch: + pull_request: + schedule: + - cron: "30 16 * * 5" +env: + CARGO_TERM_COLOR: always + CI: true + +jobs: + ci: + runs-on: ubuntu-latest + strategy: + matrix: + rust: + - stable + - beta + - 1.75.0 + k8s: + - v1.26 + - latest + + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + name: Setup toolchain + with: + toolchain: ${{ matrix.rust }} + components: rustfmt,clippy + + - uses: Swatinem/rust-cache@v2 + + - name: Fmt + run: cargo fmt --all -- --check + + - name: Clippy + run: cargo clippy --all-targets -- -D warnings + + - name: Build + run: cargo build + + - name: Light tests + # env: + # RUST_LOG: kube_lease=debug + run: cargo test --lib --all + + - uses: nolar/setup-k3d-k3s@v1 + with: + version: ${{matrix.k8s}} + k3d-name: kube + github-token: ${{ secrets.GITHUB_TOKEN }} + k3d-args: "--no-lb --no-rollback --k3s-arg --disable=traefik,servicelb,metrics-server@server:*" + + - name: Heavy tests + # env: + # RUST_LOG: kube_lease_manager=debug + run: cargo test --lib --all -- --ignored diff --git a/.github/workflows/coverage.yaml b/.github/workflows/coverage.yaml new file mode 100644 index 0000000..085fe33 --- /dev/null +++ b/.github/workflows/coverage.yaml @@ -0,0 +1,58 @@ +name: Code coverage report +on: + workflow_dispatch: + pull_request: + push: + branches: + - main + - rel-v* + +env: + CARGO_TERM_COLOR: always + +jobs: + codecov: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Setup for testing + run: | + mkdir grcov + curl -sL https://github.com/mozilla/grcov/releases/latest/download/grcov-x86_64-unknown-linux-gnu.tar.bz2 | (cd grcov ; tar jxf -) + rustup toolchain add nightly --component llvm-tools-preview + rustup override set nightly + + - uses: Swatinem/rust-cache@v2 + - uses: nolar/setup-k3d-k3s@v1 + with: + version: latest + k3d-name: kube + github-token: ${{ secrets.GITHUB_TOKEN }} + k3d-args: "--no-lb --no-rollback --k3s-arg --disable=traefik,servicelb,metrics-server@server:*" + + - name: Test + env: + CI: true + RUSTFLAGS: -Cinstrument-coverage + LLVM_PROFILE_FILE: kube-lease-manager-%p-%m.profraw + RUST_LOG: kube_lease_manager=debug + run: cargo test --lib --all -- --include-ignored + + - name: Generate coverage + run: | + grcov/grcov $(find . -name "kube-lease-manager-*.profraw" -print) \ + --branch \ + --ignore-not-existing \ + --binary-path ./target/debug/ \ + -s src \ + -t lcov \ + --ignore "/*" \ + --excl-line '^\s*\.await\??;?$' --excl-br-line '^\s*\.await\??;?$' \ + -o lcov.info + + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} + fail_ci_if_error: true + file: ./lcov.info diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml new file mode 100644 index 0000000..d9dce99 --- /dev/null +++ b/.github/workflows/publish.yaml @@ -0,0 +1,64 @@ +name: Release +permissions: + contents: write +on: + workflow_dispatch: + push: + tags: ["v[0-9]+.[0-9]+.[0-9]+"] +env: + CARGO_TERM_COLOR: always + +jobs: + generate-changelog: + name: Generate changelog + runs-on: ubuntu-latest + outputs: + release_body: ${{ steps.git-cliff.outputs.content }} + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Generate release changelog + uses: orhun/git-cliff-action@v3 + id: git-cliff + with: + config: cliff.toml + args: -v --latest --strip header --github-token ${{ secrets.GITHUB_TOKEN }} + env: + OUTPUT: CHANGELOG.md + GITHUB_REPO: ${{ github.repository }} + + release: + name: Release to Github + needs: generate-changelog + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Release + uses: softprops/action-gh-release@v2 + with: + body: "${{ needs.generate-changelog.outputs.release_body }}" + name: "Release ${{ github.ref_name }}" + + publish: + name: Publish to Crates.io + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup toolchain + uses: dtolnay/rust-toolchain@stable + with: + profile: minimal + toolchain: stable + override: true + + - name: Publish to Crates.io + run: cargo publish --token ${CRATES_TOKEN} + env: + CRATES_TOKEN: ${{ secrets.CRATES_TOKEN }} diff --git a/.gitignore b/.gitignore index 6985cf1..97aa863 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ # will have compiled files and executables debug/ target/ +.DS_Store # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html @@ -12,3 +13,10 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb + +# Idea +.idea/ + +# Local development artifacts +TODO.md +.vscode/ diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f0bfc9a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "kube-lease-manager" +authors = ["Oleksii Karpenko "] +categories = ["api-bindings", "asynchronous"] +description = "Ergonomic and durable leader election using Kubernetes Lease API." +edition = "2021" +rust-version = "1.75" +homepage = "https://github.com/alex-karpenko/kube-lease-manager" +keywords = ["kubernetes", "async", "lease", "leader", "election"] +license = "MIT" +readme = "README.md" +repository = "https://github.com/alex-karpenko/kube-lease-manager" +version = "0.1.0" +exclude = [ + ".github/**", + ".vscode/**", + "TODO.md", + "Cargo.lock", + "target/**", + ".gitignore", + ".cargo/**", +] + +[dependencies] +k8s-openapi = { version = "0.22.0" } +kube = { version = "0.92.1", default-features = false, features = ["client"] } +rand = { version = "0.8.5" } +thiserror = "1.0.61" +tokio = { version = "1.38.0", default-features = false, features = [ + "sync", + "time", + "macros", +] } +tracing = { version = "0.1.40", default-features = false, features = ["std"] } + +[dev-dependencies] +futures = { version = "0.3.30", default-features = false, features = [ + "async-await", +] } +kube = { version = "0.92.1", default-features = false, features = [ + "rustls-tls", +] } +tokio = { version = "1.38.0", default-features = false, features = [ + "rt-multi-thread", +] } +tracing-subscriber = { version = "0.3.18" } +uuid = { version = "1.9.1", features = ["v4"] } diff --git a/README.md b/README.md index 04cb9e7..a2c547c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,119 @@ -# kube-lease -Wrapper for Kubernetes Lease API, siplifies leader election. +# kube-lease-manager + +Ergonomic and durable leader election using Kubernetes Lease API. + +

+CI status +Audit status +Crates.io publishing status +docs.rs status +Version at Crates.io +License +

+ +`kube-lease-manager` is a high-level helper to facilitate leader election using +[Lease Kubernetes resource](https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/lease-v1/). +It ensures that only a single instance of the lease managers holds the lock at any moment of time. + +Some of the typical use cases: +* automatic coordination of leader election between several instances (Pods) of Kubernetes controllers; +* ensure only a single instance of concurrent jobs is running right now; +* exclusive acquiring of shared resource. + +## Features + +* `LeaseManager` is a central part of the crate. + This is a convenient wrapper around a Kubernetes `Lease` resource to manage all aspects of leader election process. +* Provides two different high-level approaches to lock and release lease: + fully automated or partially manual lock control. +* Uses [Server-Side-Apply](https://kubernetes.io/docs/reference/using-api/server-side-apply/) + approach to update lease state that facilitates conflict detection and resolution + and makes impossible concurrent locking. +* Tolerate configurable time skew between nodes of the Kubernetes cluster. +* Behavioral parameters of the lease manager are easily and flexibly configurable. +* Uses well-known and highly appreciated [kube](https://crates.io/crates/kube) + and [Tokio](https://crates.io/crates/tokio) + crates to access Kubernetes API and coordinate asynchronous tasks execution. +* You don't need to use low-level Kubernetes API. +* Uses Tokio [`tracing`](https://crates.io/crates/tracing) carte to provide event logs. + +Please visit [crate's documentation](https://docs.rs/kube-lease-manager/) to get details and more examples. + +--- + +As mentioned above, `kube-lease-manager` provides two possible ways to manage lease lock: +1. _Fully automated_: you create `LeaseManager` instance and run its `watch()` method. + It returns [Tokio watch channel](https://docs.rs/tokio/1.38.0/tokio/sync/watch/index.html) to watch on state changes + Besides that it runs an unattended background task + which permanently tries to lock lease if it's free and publish changed state to the channel. + The task finishes if the channel is closed. +2. _Partially manual_: you create `LeaseManager` + instance and use its `changed()` + and `release()` methods to control lock. + `changed()` tries to lock lease as soon as it becomes free and returns actual lock state when it's changed. + Your responsibilities are: + - to keep `changed()` running (it's a `Future`) to ensure lock is refreshing while it's in use; + - to call `release()` when you don't need the lock and want to make it free for others. + +First way ensures that lease is locked (has a holder) at any moment of time. +Second makes possible to acquire and release lock when you need it. + +## Example + +The simplest example using first locking approach: +```rust +use kube::Client; +use kube_lease_manager::LeaseManagerBuilder; +use std::time::Duration; + +#[tokio::main] +async fn main() { + // Use default Kube client + let client = Client::try_default().await.unwrap(); + // Create the simplest LeaseManager with reasonable defaults using convenient builder. + // It uses Lease resource called `test-watch-lease`. + let manager = LeaseManagerBuilder::new(client, "test-auto-lease") + .build() + .await + .unwrap(); + + let (mut channel, task) = manager.watch().await; + // Watch on the channel for lock state changes + tokio::select! { + _ = channel.changed() => { + let lock_state = *channel.borrow_and_update(); + + if lock_state { + // Do something useful as a leader + println!("Got a luck!"); + } + } + _ = tokio::time::sleep(Duration::from_secs(10)) => { + println!("Unable get lock during 10s"); + } + } + + // Explicitly close the control channel + drop(channel); + // Wait for the finish of the manager and get it back + let _manager = tokio::join!(task).0.unwrap().unwrap(); +} +``` + +Please visit [crate's documentation](https://docs.rs/kube-lease-manager/) to get more examples and usage details. + +## TODO + +- [ ] Provide some real and useful examples. + +## Credits + +The author was inspired on this piece of work by two other crates that provide similar functionality: +[kubert](https://crates.io/crates/kubert) and [kube-leader-election](https://crates.io/crates/kube-leader-election). +Both of them are great, thanks to the authors. +But both have something missing for one of my projects. +So it was a reason to create this one. + +## License + +This project is licensed under the [MIT license](LICENSE). diff --git a/cliff.toml b/cliff.toml new file mode 100644 index 0000000..6f8832f --- /dev/null +++ b/cliff.toml @@ -0,0 +1,125 @@ +# git-cliff ~ configuration file +# https://git-cliff.org/docs/configuration + +[remote.github] +owner = "alex-karpenko" +repo = "kube-lease-manager" + +[changelog] +# changelog header +header = """ +# Changelog\n +All notable changes to this project will be documented in this file. + +""" +# template for the changelog body +# https://keats.github.io/tera/docs/#introduction +body = """ +{%- macro remote_url() -%} + https://github.com/{{ remote.github.owner }}/{{ remote.github.repo }} +{%- endmacro -%} + +{% if version -%} + ## [{{ version | trim_start_matches(pat="v") }}] - {{ timestamp | date(format="%Y-%m-%d") }} +{% else -%} + ## [Unreleased] +{% endif -%} + +### Details\ + +{% if commits|filter(attribute="breaking", value=true)|length > 0 %} +#### Breaking changes +{% endif -%} + +{% for commit in commits|filter(attribute="breaking", value=true) -%} +- {{ commit.message | upper_first | trim }}\ + {% if commit.github.username %} by @{{ commit.github.username }}{%- endif -%} + {% if commit.github.pr_number %} in \ + [#{{ commit.github.pr_number }}]({{ self::remote_url() }}/pull/{{ commit.github.pr_number }}) \ + {%- endif %} +{% endfor -%}\ + +{% for group, commits in commits | group_by(attribute="group") %} + #### {{ group | upper_first }} + {%- for commit in commits %}\ + {% if not commit.breaking %} + - {{ commit.message | upper_first | trim }}\ + {% if commit.github.username %} by @{{ commit.github.username }}{%- endif -%} + {% if commit.github.pr_number %} in \ + [#{{ commit.github.pr_number }}]({{ self::remote_url() }}/pull/{{ commit.github.pr_number }}) \ + {%- endif -%} + {%- endif -%} + {% endfor %} +{% endfor %} + +{%- if github.contributors | filter(attribute="is_first_time", value=true) | length != 0 %} + ## New Contributors +{%- endif -%} + +{% for contributor in github.contributors | filter(attribute="is_first_time", value=true) %} + * @{{ contributor.username }} made their first contribution + {%- if contributor.pr_number %} in \ + [#{{ contributor.pr_number }}]({{ self::remote_url() }}/pull/{{ contributor.pr_number }}) \ + {%- endif %} +{% endfor %} + +""" +# template for the changelog footer +footer = """ +{%- macro remote_url() -%} + https://github.com/{{ remote.github.owner }}/{{ remote.github.repo }} +{%- endmacro -%} + +{% for release in releases -%} + {% if release.version -%} + {% if release.previous.version -%} + [{{ release.version | trim_start_matches(pat="v") }}]: \ + {{ self::remote_url() }}/compare/{{ release.previous.version }}..{{ release.version }} + {% endif -%} + {% else -%} + [unreleased]: {{ self::remote_url() }}/compare/{{ release.previous.version }}..HEAD + {% endif -%} +{% endfor %} + +""" +# remove the leading and trailing whitespace from the templates +trim = true + +[git] +# parse the commits based on https://www.conventionalcommits.org +conventional_commits = true +# filter out the commits that are not conventional +filter_unconventional = true +# process each line of a commit as an individual commit +split_commits = false +# regex for preprocessing the commit messages +commit_preprocessors = [ + # remove issue numbers from commits + { pattern = '\((\w+\s)?#([0-9]+)\)', replace = "" }, +] +# regex for parsing and grouping commits +commit_parsers = [ + { message = "^.*: [Aa]dd", group = "Added" }, + { message = "^.*: [Ii]mplement", group = "Added" }, + { message = "^.*: support", group = "Added" }, + { message = "^.*: [Rr]emove", group = "Removed" }, + { message = "^.*: [Dd]elete", group = "Removed" }, + { message = "^test", group = "Fixed" }, + { message = "^fix", group = "Fixed" }, + { message = "^.*: [Ff]ix", group = "Fixed" }, + { message = "^.*", group = "Changed" }, +] +# protect breaking changes from being skipped due to matching a skipping commit_parser +protect_breaking_commits = true +# filter out the commits that are not matched by commit parsers +filter_commits = true +# regex for matching git tags +tag_pattern = "v[0-9].*" +# regex for skipping tags +skip_tags = "v0.1.0-beta.1" +# regex for ignoring tags +ignore_tags = "" +# sort the tags topologically +topo_order = false +# sort the commits inside sections by oldest/newest order +sort_commits = "oldest" diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..bfdc987 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,8 @@ +coverage: + status: + project: + default: + informational: true + patch: + default: + informational: true diff --git a/deny.toml b/deny.toml new file mode 100644 index 0000000..c62a31d --- /dev/null +++ b/deny.toml @@ -0,0 +1,65 @@ +[advisories] +db-path = "~/.cargo/advisory-db" +db-urls = ["https://github.com/rustsec/advisory-db"] +yanked = "deny" + +[licenses] +allow = ["MIT", "Apache-2.0"] +confidence-threshold = 0.93 +exceptions = [ + { allow = [ + "ISC", + ], name = "untrusted" }, + { allow = [ + "ISC", + ], name = "rustls-webpki" }, + { allow = [ + "BSD-3-Clause", + ], name = "subtle" }, + { allow = [ + "Unicode-DFS-2016", + ], name = "unicode-ident" }, + { allow = [ + "OpenSSL", + ], name = "ring" }, +] + +[[licenses.clarify]] +name = "ring" +version = "*" +expression = "OpenSSL" +license-files = [ + # Each entry is a crate relative path, and the (opaque) hash of its contents + { path = "LICENSE", hash = 0xbd0eed23 }, +] + +[licenses.private] +ignore = false +registries = [ + #"https://sekretz.com/registry +] + +[bans] +multiple-versions = "allow" +wildcards = "allow" +highlight = "all" +workspace-default-features = "allow" +external-default-features = "allow" +allow = [] +deny = [] + +skip = [] +skip-tree = [] + +[sources] +unknown-registry = "deny" +unknown-git = "deny" +allow-registry = ["https://github.com/rust-lang/crates.io-index"] +allow-git = [] + +[graph] +all-features = false +no-default-features = false + +[output] +feature-depth = 1 diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..7530651 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +max_width = 120 diff --git a/src/backoff.rs b/src/backoff.rs new file mode 100644 index 0000000..e51f371 --- /dev/null +++ b/src/backoff.rs @@ -0,0 +1,136 @@ +use rand::{thread_rng, Rng}; +use std::time::Duration; +use tracing::trace; + +pub(crate) type DurationFloat = f64; + +pub(crate) struct BackoffSleep { + min: DurationFloat, + max: DurationFloat, + last: DurationFloat, + mult: DurationFloat, +} + +impl BackoffSleep { + pub(crate) fn new(min: DurationFloat, max: DurationFloat, mult: DurationFloat) -> Self { + if mult <= 1.0 { + panic!("`mult` should be greater than 1.0 to make backoff interval increasing") + } + + if min >= max { + panic!("`max` should be greater than `min` to make backoff interval increasing") + } + + if min <= 0.0 || max <= 0.0 { + panic!("`min` and `max` should be greater than zero") + } + + Self { + min, + max, + mult, + last: min, + } + } + + pub(crate) fn reset(&mut self) { + trace!("reset backoff state"); + self.last = self.min; + } + + pub(crate) async fn sleep(&mut self) { + let duration = self.next(); + trace!(?duration, "backoff sleep"); + tokio::time::sleep(duration).await; + } + + fn next(&mut self) -> Duration { + self.last = self.random(); + trace!(duration = ?self.last, "next random duration requested"); + Duration::from_secs_f64(self.last) + } + + fn random(&self) -> DurationFloat { + let min = self.last; + let max = min * self.mult; + + let (min, max) = if max > self.max { + (self.max / self.mult, self.max) + } else { + (min, max) + }; + + let val = thread_rng().gen_range(min..max); + trace!( + min = format!("{min:.3}"), + val = format!("{val:.3}"), + max = format!("{max:.3}"), + "generate next random duration" + ); + + val + } +} + +#[cfg(test)] +mod tests { + use std::borrow::BorrowMut; + + use super::*; + use crate::tests::init_tracing; + + #[test] + fn ensure_every_next_is_longer() { + init_tracing(); + + let mut backoff = BackoffSleep::new(0.1, 10.0, 2.0); + let mut prev = Duration::ZERO; + + for _ in 0..5 { + let next = backoff.next(); + assert!(next > prev); + prev = next; + } + } + + #[test] + fn ensure_long_less_then_max() { + const MIN: DurationFloat = 0.1; + const MAX: DurationFloat = 10.0; + const MULT: f64 = 2.0; + + init_tracing(); + + let mut backoff = BackoffSleep::new(MIN, MAX, MULT); + let v: Vec = (0..55).borrow_mut().map(|_| backoff.next()).skip(50).collect(); + for d in v { + assert!(d <= Duration::from_secs_f64(MAX)); + assert!(d >= Duration::from_secs_f64(MAX / MULT)); + } + } + + #[test] + #[should_panic = "`mult` should be greater than 1.0 to make backoff interval increasing"] + fn incorrect_backoff_mult_equal_to_one() { + let _ = BackoffSleep::new(1.0, 2.0, 1.0); + } + + #[test] + #[should_panic = "`mult` should be greater than 1.0 to make backoff interval increasing"] + fn incorrect_backoff_mult_less_than_one() { + let _ = BackoffSleep::new(1.0, 2.0, 0.999); + } + + #[test] + #[should_panic = "`max` should be greater than `min` to make backoff interval increasing"] + fn incorrect_backoff_min_greater_than_max() { + let _ = BackoffSleep::new(3.0, 2.0, 2.0); + } + + #[test] + #[should_panic = "`min` and `max` should be greater than zero"] + fn incorrect_backoff_min_or_max_less_then_zero() { + let _ = BackoffSleep::new(0.0, 2.0, 2.0); + let _ = BackoffSleep::new(0.0, 0.0, 2.0); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..10a5552 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,1714 @@ +#![deny(unsafe_code)] +//! Ergonomic and durable leader election using Kubernetes Lease API. +//! +//! `kube-lease-manager` is a high-level helper to facilitate leader election using +//! [Lease Kubernetes resource](https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/lease-v1/). +//! It ensures that only a single instance of the lease managers holds the lock at any moment of time. +//! +//! Some of the typical use cases: +//! * automatic coordination of leader election between several instances (Pods) of Kubernetes controllers; +//! * ensure only a single instance of concurrent jobs is running right now; +//! * exclusive acquiring of shared resource. +//! +//! ## Features +//! +//! * [`LeaseManager`] is a central part of the crate. +//! This is a convenient wrapper around a Kubernetes `Lease` resource to manage all aspects of leader election process. +//! * Provides two different high-level approaches to lock and release lease: +//! fully automated or partially manual lock control. +//! * Uses [Server-Side-Apply](https://kubernetes.io/docs/reference/using-api/server-side-apply/) +//! approach to update lease state that facilitates conflict detection and resolution +//! and makes impossible concurrent locking. +//! * Tolerate configurable time skew between nodes of the Kubernetes cluster. +//! * Behavioral parameters of the lease manager are easily and flexibly configurable. +//! * Uses well-known and highly appreciated [kube](https://crates.io/crates/kube) +//! and [Tokio](https://crates.io/crates/tokio) +//! crates to access Kubernetes API and coordinate asynchronous tasks execution. +//! * You don't need to use low-level Kubernetes API. +//! * Uses Tokio [`tracing`](https://crates.io/crates/tracing) carte to provide event logs. +//! +//! --- +//! +//! As mentioned above, `kube-lease-manager` provides two possible ways to manage lease lock: +//! 1. _Fully automated_: you create [`LeaseManager`] instance and run its [`watch()`](LeaseManager::watch()) method. +//! It returns [Tokio watch channel](https://docs.rs/tokio/latest/tokio/sync/watch/index.html) to watch on state changes +//! Besides that it runs an unattended background task +//! which permanently tries to lock lease if it's free and publish changed state to the channel. +//! The task finishes if the channel is closed. +//! 2. _Partially manual_: you create [`LeaseManager`] +//! instance and use its [`changed()`](LeaseManager::changed()) +//! and [`release()`](LeaseManager::release()) methods to control lock. +//! `changed()` tries to lock lease as soon as it becomes free and returns actual lock state when it's changed. +//! Your responsibilities are: +//! - to keep `changed()` running (it's a `Future`) to ensure lock is refreshing while it's in use; +//! - to call `release()` when you don't need the lock and want to make it free for others. +//! +//! First way ensures that lease is locked (has a holder) at any moment of time. +//! Second makes possible to acquire and release lock when you need it. +//! +//! ## LeaseManager config +//! +//! The main config of the [`LeaseManager`] is a [`LeaseParams`] structure, which +//! describes several parameters that affect managers' behavior. +//! Those parameters are: +//! * _identity_: unique string identifier of the lease manager among all other instances. +//! Usually this is some randomly generated string (UUID, for example). +//! `LeaseParams` can provide default value by generating random 32-symbol alphanumeric string, +//! or you can explicitly specify identity string while creating `LeaseParams` +//! or directly via [`LeaseManagerBuilder`]. +//! * _duration_: this is a maximum duration (in seconds) of lock validity after last renewal (confirmation) of the lock. +//! In other words, the current lock holder is obliged to renew (re-confirm) its lock during this time after last renewal. +//! If the holder didn't re-confirm lock, any other `LeaseManager` instance is permitted to grab the lock. +//! The default value provided by `LeaseParams` is 30 seconds and may be configured. +//! * _grace_: to avoid flapping losses of lock, the actual leaseholder tries to re-confirm (renew) lock earlier, +//! before it expires (before the end of the `duration` interval). +//! This parameter defines an interval (in seconds) before lock expiration when the lock holder has to renew its lock. +//! As a side effect, +//! this interval can be considered as a maximum allowed time synchronization skew between nodes in the Kubernetes cluster +//! to avoid overlying locking. +//! The default value provided by `LeaseParams` is 5 seconds and may be configured. +//! * _field_manager_: +//! identifier of the Kubernetes [field manager](https://kubernetes.io/docs/reference/using-api/server-side-apply/#managers) +//! to authorize changes of the Lease resources. +//! It should be unique among other managers. +//! Usually you don't need to specify it explicitly because of LeaseParams generates it concatenating crates' +//! name (kube-lease-manager) and `identity` string. +//! But it's possible to specify field manger directly via `LeaseParams` +//! [with_field_manager()](LeaseParams::with_field_manager) method or using [`LeaseManagerBuilder`]. +//! +//! The next config option is a [`LeaseCreateMode`] +//! which defines the behavior how LeaseManager manages Lease Kubernetes resource during startup. +//! The default behavior is [`AutoCreate`](LeaseCreateMode::AutoCreate): +//! create resource if it doesn't exist or use existing one if it's already present. +//! +//! The last significant parameter of the [`LeaseManager`] is a Lease resource name. +//! Obviously, it has no defaults and should be specified explicitly via LeaseManager [`new()`](LeaseManager::new) +//! constructor of using [`LeaseManagerBuilder`]. +//! +//! ## Examples +//! +//! Create [`LeaseManager`] with reasonable defaults using convenient ['LeaseManagerBuilder'] and use ["watch"](LeaseManager::watch) +//! approach to get notified about lock holder changes. +//! +//! ```no_run +//! use kube::Client; +//! use kube_lease_manager::{LeaseManagerBuilder, Result}; +//! use std::time::Duration; +//! +//! #[tokio::main] +//! async fn main() -> Result<()> { +//! // Use default Kube client +//! let client = Client::try_default().await?; +//! +//! // Create the simplest LeaseManager with reasonable defaults using convenient builder. +//! // It uses Lease resource called `test-lease-name`. +//! // With default auto-create mode Lease will be created if it doesn't exist, +//! // or existing one will be used otherwise. +//! // The default lease duration is 30 seconds with grace period 5 seconds. +//! let manager = LeaseManagerBuilder::new(client, "test-lease-name") +//! .build() +//! .await?; +//! +//! // Start manager in watching mode and get back status channel and task handler. +//! let (mut channel, task) = manager.watch().await; +//! +//! // Watch on the channel for lock state changes. +//! tokio::select! { +//! _ = channel.changed() => { +//! let lock_state = *channel.borrow_and_update(); +//! +//! if lock_state { +//! // Do something useful as a leader +//! println!("Got a luck!"); +//! } +//! } +//! _ = tokio::time::sleep(Duration::from_secs(10)) => { +//! println!("Unable get lock during 10s"); +//! } +//! } +//! +//! // Explicitly close the control channel +//! drop(channel); +//! +//! // Wait for the finish of the manager and get it back +//! let _manager = tokio::join!(task).0.unwrap()?; +//! +//! Ok(()) +//! } +//! ``` +//! +//! More examples with detailed explanations can be found in the corresponding documentation chapters about [`LeaseManager`], +//! [`LeaseManagerBuilder`], [`LeaseParams`] and [`LeaseCreateMode`], +//! as well as an explanation of the specific errors provided by [`LeaseManagerError`]. +//! + +mod backoff; +mod state; + +use backoff::{BackoffSleep, DurationFloat}; +use kube::Client; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use state::{LeaseLockOpts, LeaseState, LeaseStateError}; +use std::{ + fmt::Debug, + sync::atomic::{AtomicBool, Ordering}, + time::{Duration, SystemTime}, +}; +use tokio::{select, sync::RwLock, task::JoinHandle}; +use tracing::{debug, error, trace}; + +type DurationMillis = u64; + +/// Since all durations related to Lease resource are in seconds, this alias is useful. +pub type DurationSeconds = u64; +/// Convenient alias for `Result`. Uses [`LeaseManagerError`] as an Error type. +pub type Result = std::result::Result; + +/// Default value of the `duration` parameter. +pub const DEFAULT_LEASE_DURATION_SECONDS: DurationSeconds = 30; +/// Default value of the `grace` parameter. +pub const DEFAULT_LEASE_GRACE_SECONDS: DurationSeconds = 5; + +/// Length of the random identity string generated by [`default`](LeaseParams::default) [`LeaseParams`] constructor. +pub const DEFAULT_RANDOM_IDENTITY_LEN: usize = 32; +const DEFAULT_FIELD_MANAGER_PREFIX: &str = env!("CARGO_PKG_NAME"); + +const MIN_RELEASE_WAITING_MILLIS: DurationMillis = 100; +const MAX_RELEASE_WAITING_MILLIS: DurationMillis = 1000; + +const MIN_CONFLICT_BACKOFF_TIME: DurationFloat = 0.1; +const MAX_CONFLICT_BACKOFF_TIME: DurationFloat = 5.0; +const CONFLICT_BACKOFF_MULT: DurationFloat = 2.0; + +const MIN_WATCHER_BACKOFF_TIME: DurationFloat = 1.0; +const MAX_WATCHER_BACKOFF_TIME: DurationFloat = 30.0; +const WATCHER_BACKOFF_MULT: DurationFloat = 2.0; + +/// Represents `kube-lease-manager` specific errors. +#[derive(thiserror::Error, Debug)] +pub enum LeaseManagerError { + /// Error originated from the Kubernetes. + #[error("Kube error: {0}")] + KubeError( + #[source] + #[from] + kube::Error, + ), + + /// Internal state inconsistency detected. + /// + /// Usually the root cause is a bug. + #[error("inconsistent LeaseManager state detected: {0}")] + InconsistentState(String), + + /// Try to create a new `Lease` resource, but it already exists. + #[error("Lease resource `{0}` already exists")] + LeaseAlreadyExists(String), + + /// Try to use non-existent `Lease` resource. + #[error("Lease resource `{0}` doesn't exist")] + NonexistentLease(String), + + /// `LeaseManager` unable to send state to the control channel. + #[error("Control channel error: {0}")] + ControlChannelError( + #[source] + #[from] + tokio::sync::watch::error::SendError, + ), +} + +/// Parameters of [`LeaseManager`]. +/// +/// The structure describes several parameters that affect [`LeaseManager`] behavior: +/// * _identity_: unique string identifier of the lease manager among all other instances. +/// Usually this is some randomly generated string (UUID, for example). +/// `LeaseParams` can provide default value by generating random 32-symbol alphanumeric string, +/// or you can explicitly specify identity string while creating `LeaseParams` with [`new()`](LeaseParams::new) +/// constructor or via [`LeaseManagerBuilder`]. +/// * _duration_: this is a maximum duration (in seconds) of lock validity after last renewal (confirmation) of the lock. +/// In other words, the current lock holder is obliged to renew (re-confirm) its lock during this time after last renewal. +/// If the holder didn't re-confirm lock, any other `LeaseManager` instance is permitted to grab the lock. +/// The default value provided by `LeaseParams` is 30 seconds and may be configured. +/// * _grace_: to avoid flapping losses of lock, the actual leaseholder tries to re-confirm (renew) lock earlier, +/// before it expires (before the end of the `duration` interval). +/// This parameter defines an interval (in seconds) before lock expiration when the lock holder has to renew its lock. +/// As a side effect, +/// this interval can be considered as a maximum allowed time synchronization skew between nodes in the Kubernetes cluster +/// to avoid overlying locking. +/// The default value provided by `LeaseParams` is 5 seconds and may be configured. +/// * _field_manager_: +/// identifier of the Kubernetes [field manager](https://kubernetes.io/docs/reference/using-api/server-side-apply/#managers) +/// to authorize changes of the Lease resources. +/// It should be unique among other managers. +/// Usually you don't need to specify it explicitly because of LeaseParams generates it concatenating crates' +/// name (kube-lease-manager) and `identity` string. +/// But it's possible to specify field manger directly via `LeaseParams` +/// [with_field_manager()](LeaseParams::with_field_manager) method or using [`LeaseManagerBuilder`]. +/// +/// `LeaseParams` may be created using [`default()`](LeaseParams::default) +/// constructor, [`new()`](LeaseParams::new) constructor, +/// or you can specify any non-default parameter value by creating `LeaseManager` using [`LeaseManagerBuilder`]. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct LeaseParams { + /// Lease holder identity. + identity: String, + /// Duration of lease lock. + duration: DurationSeconds, + /// Period of tme to renew lease lock before it expires. + grace: DurationSeconds, + /// Field manager. + field_manager: String, +} + +/// Lease Kubernetes resource creation mode. +/// +/// To coordinate leader election, +/// [`LeaseManager`] uses [`Lease`](https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/lease-v1/) +/// Kubernetes resource. +/// You specify `Lease` resource name during creation of the `LeaseManager` instance. +/// So to use that object, it should exist. +/// +/// `LeaseCreateMode` describes is `Lease` resource should be created and how, +/// or it should exist before `LeaseManager` constructing. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub enum LeaseCreateMode { + /// If a specified Lease object is already present in the Kubernetes cluster, then use it. + /// Otherwise, try to create a new object and use it. + /// This is a default behavior. + #[default] + AutoCreate, + /// Use only a newly created Lease object. + /// If it's already present in the cluster, + /// LeaseManager constructor fails with [`LeaseAlreadyExists`](LeaseManagerError::LeaseAlreadyExists) error. + CreateNew, + /// Lease object should be existent before call of the `LeaseManager` constructor. + /// If it doesn't exist, construct fails with [`NonexistentLease`](LeaseManagerError::NonexistentLease) error. + UseExistent, + #[cfg(test)] + Ignore, +} + +impl From for LeaseManagerError { + fn from(value: LeaseStateError) -> Self { + match value { + LeaseStateError::LockConflict => unreachable!("this branch is unreachable, looks like a BUG!"), + LeaseStateError::KubeError(err) => LeaseManagerError::KubeError(err), + LeaseStateError::LeaseAlreadyExists(lease) => LeaseManagerError::LeaseAlreadyExists(lease), + LeaseStateError::NonexistentLease(lease) => LeaseManagerError::NonexistentLease(lease), + LeaseStateError::InconsistentState(err) => LeaseManagerError::InconsistentState(err), + } + } +} + +/// Wrapper around a Kubernetes Lease resource to manage all aspects of leader election process. +/// +/// There are two possible ways to manage lease lock: +/// fully automated using managers' [`watch()`](LeaseManager::watch) +/// method and partially manual using [`changed()`](LeaseManager::changed) +/// and [`release()`](LeaseManager::release) methods. +/// +/// ## Fully automated way +/// +/// You create [`LeaseManager`] instance and run its [`watch()`](LeaseManager::watch()) method. +/// It consumes manager instance (to prevent using other methods) +/// and returns [Tokio watch channel](https://docs.rs/tokio/latest/tokio/sync/watch/index.html) +/// to watch on state changes, +/// and Tokio [`task handler`](https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html). +/// +/// It runs a detached background [`Tokio task`](https://docs.rs/tokio/latest/tokio/task/fn.spawn.html) +/// which permanently tries to lock lease if it's free and publish changed state to the channel. +/// The task finishes if the channel is closed or in case of channel error. +/// So this approach ensures that `Lease` is locked by some manager at any moment of time. +/// +/// `Lease` lock state is a `bool` value, +/// wrapped into the [`Receiver`](https://docs.rs/tokio/latest/tokio/sync/watch/struct.Receiver.html) structure, +/// it's `true` if the manager got the lease lock, and `false` if it lost the lease. +/// Refer to the [`tokio::sync::watch`](https://docs.rs/tokio/latest/tokio/sync/watch/index.html) +/// documentation for details about using it. +/// But the most convenient way is to use receivers' +/// [`changed()`](https://docs.rs/tokio/latest/tokio/sync/watch/struct.Receiver.html#method.changed) +/// method to get notified when channels' value was changed, +/// and follow it by the [`borrow_and_update()`](https://docs.rs/tokio/latest/tokio/sync/watch/struct.Receiver.html#method.borrow_and_update) +/// method call to read the actual state value. +/// +/// When the channel is closed (if all receivers went out of their scopes and deleted, or by explicit `drop()` call), +/// watching task finishes, +/// and you can (preferably you should) call [`tokio::join`](https://docs.rs/tokio/latest/tokio/macro.join.html) +/// to get its result. +/// It returns the previously consumed `LeaseManager` instance, +/// so you can use it next time, but the new status channel and the new watching task will be created. +/// +/// `watch()` tolerates (hides) all errors except watch channel related ones. +/// In case of error, it tries to recover state repeating calls to Kubernetes API with an increasing backoff interval. +/// +/// ### Example +/// +/// This artificial example tries to run some workload as soon as it gets lease lock, +/// and cancels workload if lock was lost. +/// +/// Something similar is used in Kubernetes controllers that have several Pods running (to ensure high availability), +/// but only one of them is eligible to make changes to the actual state. +/// +/// ```no_run +/// use kube::Client; +/// use kube_lease_manager::{LeaseManagerBuilder, Result}; +/// use std::time::Duration; +/// use tokio::task::JoinHandle; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // Use default Kube client +/// let client = Client::try_default().await?; +/// +/// // Create the simplest LeaseManager with reasonable defaults using convenient builder. +/// // With default auto-create mode Lease will be created if it doesn't exist, +/// // or existing one will be used otherwise. +/// // The default lease duration is 30 seconds with grace period 5 seconds. +/// let manager = LeaseManagerBuilder::new(client, "test-lease-name") +/// .with_duration(5) +/// .with_grace(2) +/// .build() +/// .await?; +/// +/// // Start manager in watching mode and get back status channel and task handler. +/// let (mut channel, _task) = manager.watch().await; +/// +/// // Watch on the channel for lock state changes. +/// +/// let mut my_work_task: Option> = None; +/// +/// // Try to keep out task running as long as possible. +/// // Restart it when lock is ours. +/// loop { +/// // Wait for the change of the lock +/// let _ = channel.changed(); +/// // And grab the state +/// let lock_state = *channel.borrow_and_update(); +/// +/// if lock_state { +/// // Do something useful as a leader +/// println!("Got luck! Run our exclusive work..."); +/// my_work_task = Some(tokio::spawn(Box::pin(do_work()))); +/// } else { +/// println!("Lost the lease lock! Lets wait for the next one..."); +/// // Abort running task (or use something more sophisticated to gracefully stop it) +/// if let Some(task) = &my_work_task { +/// task.abort(); +/// } +/// my_work_task = None; +/// } +/// } +/// +/// // Explicitly close the control channel. But actually this is unreachable part die to endless loop above. +/// // drop(channel); +/// // let _manager = tokio::join!(task).0.unwrap()?; +/// } +/// +/// async fn do_work() { +/// futures::pending!(); +/// } +/// ``` +/// +/// ## Partially manual approach +/// +/// The same, you create [`LeaseManager`] instance but use its [`changed()`](LeaseManager::changed()) +/// and [`release()`](LeaseManager::release()) methods to control lock. +/// `changed()` tries to lock lease as soon as it becomes free and returns actual lock state when it's changed. +/// It finishes and returns changed state each time the managers' lock state changes. +/// It stays running until the state of this particular manager is the same as before. +/// +/// At the same time, if the manager holds lock then `changed()` method keeps it held by permanently refreshing lease lock. +/// So to avoid loss of the lock, you're responsible to keep `changed()` running (it's a `Future`) +/// to ensure refreshing of the lock, +/// and to ensure this particular manager is a holder of the lease until you need it locked. +/// +/// When you don't need the lease lock anymore, you have to call `release()` +/// method and make lock free to be acquired by any other manager. +/// +/// In contrast to the previous approach, `changed()` method returns any errors related to interacting with Kubernetes API. +/// So it's up to you how to respond to errors. +/// +/// ### Example +/// +/// This example shows how to start actual work only after acquiring the lease lock and release it after finish. +/// +/// ```no_run +/// use kube::Client; +/// use kube_lease_manager::{LeaseManagerBuilder, Result}; +/// use std::time::Duration; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // Use default Kube client +/// let client = Client::try_default().await?; +/// // Create the simplest LeaseManager with reasonable defaults using convenient builder. +/// // It uses Lease resource called `test-watch-lease`. +/// let manager = LeaseManagerBuilder::new(client, "test-manual-lease").build().await?; +/// +/// // Try to get a lock on resource +/// let state = manager.changed().await?; +/// assert!(state); +/// +/// // Lets run two branches: +/// // - first one watches on state to ensure we don't work with lost lease and refreshes lock +/// // - second one does actual work +/// tokio::select! { +/// // Ensure `changed()` is running to refresh lease lock +/// lock_state = manager.changed() => { +/// if let Ok(state) = lock_state { +/// println!("Looks like lock state was changed to {state} before we finished."); +/// assert!(!state); +/// } else { +/// println!("Something wrong happened: {lock_state:?}.") +/// } +/// } +/// // Do everything you need with locked resource +/// _ = async { +/// println!("We got a lease lock! Lets do out heady work..."); +/// // Do something useful here +/// tokio::time::sleep(Duration::from_secs(1)).await +/// } => { +/// println!("We've done our heavy work."); +/// // Release lock after finish +/// manager.release().await?; +/// // And ensure state was changed +/// assert!(!manager.changed().await?); +/// } +/// } +/// +/// Ok(()) +/// } +/// ``` +#[derive(Debug)] +pub struct LeaseManager { + /// Parameters of the desired lock. + params: LeaseParams, + /// Current state. + state: RwLock, + /// Is current identity set as leader now. + is_leader: AtomicBool, +} + +impl LeaseParams { + /// Constructs instances of `LeaseParams`. + /// + /// Parameters should satisfy all conditions below, otherwise method panics: + /// * duration > 0; + /// * grace > 0; + /// * grace < duration. + pub fn new(identity: impl Into, duration: DurationSeconds, grace: DurationSeconds) -> Self { + let duration: DurationSeconds = duration; + let grace: DurationSeconds = grace; + + if duration == 0 || grace == 0 { + panic!("duration and grace period should be greater than zero"); + } else if grace >= duration { + panic!("grace period should be less than lease lock duration"); + } + + let identity = identity.into(); + let field_manager = format!("{DEFAULT_FIELD_MANAGER_PREFIX}-{}", identity); + Self { + identity, + duration, + grace, + field_manager, + } + } + + /// Set the specified field manager value instead of the default one. + /// + /// Default is `identity` string prefixed by `kube-lease-manager-`, and usually you don't need to change it, by can. + /// + /// It has to be unique among other field managers withing the scope of the `Lease` resource. + /// So if you change it, your responsibility is to ensure its uniqueness. + pub fn with_field_manager(self, field_manager: impl Into) -> Self { + Self { + field_manager: field_manager.into(), + ..self + } + } + + /// Return current filed_manager value to use in [`patch()`](LeaseState::patch). + fn field_manager(&self) -> String { + self.field_manager.clone() + } +} + +impl Default for LeaseParams { + /// Creates parameters instance with reasonable defaults: + /// - random alphanumeric identity string of [`DEFAULT_RANDOM_IDENTITY_LEN`] length; + /// - the lease duration is [`DEFAULT_LEASE_DURATION_SECONDS`]; + /// - the lease grace period is [`DEFAULT_LEASE_GRACE_SECONDS`]. + fn default() -> Self { + Self::new( + random_string(DEFAULT_RANDOM_IDENTITY_LEN), + DEFAULT_LEASE_DURATION_SECONDS, + DEFAULT_LEASE_GRACE_SECONDS, + ) + } +} + +/// Convenient builder of [`LeaseManager`]. +/// +/// It facilitates creating of the manager instance with reasonable defaults. +/// +/// Please refer to [`LeaseParams`] and [`LeaseCreateMode`] for detailed explanation of each parameter. +/// +/// ## Examples +/// +/// To build default [`LeaseManager`]: +/// +/// ```no_run +/// use kube::Client; +/// use kube_lease_manager::{LeaseManagerBuilder, LeaseManager, Result}; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// let client = Client::try_default().await?; +/// let manager = LeaseManagerBuilder::new(client, "my-unique-test-lease") +/// .build() +/// .await?; +/// // ... +/// +/// Ok(()) +/// } +/// ``` +/// +/// Redefine everything possible: +/// +/// ```no_run +/// use kube::Client; +/// use kube_lease_manager::{LeaseManagerBuilder, LeaseManager, Result, LeaseCreateMode}; +/// use uuid::Uuid; +/// +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// let identity = Uuid::new_v4(); +/// let client = Client::try_default().await?; +/// +/// let manager = LeaseManagerBuilder::new(client, "existing-test-lease") +/// .with_namespace("lease-manager-test-ns") +/// .with_create_mode(LeaseCreateMode::UseExistent) +/// .with_field_manager(format!("custom-field-manager-{identity}")) +/// .with_identity(identity) +/// .with_duration(128) +/// .with_grace(16) +/// .build() +/// .await?; +/// // ... +/// +/// Ok(()) +/// } +/// ``` +pub struct LeaseManagerBuilder { + client: Client, + lease_name: String, + namespace: String, + params: LeaseParams, + create_mode: LeaseCreateMode, +} + +impl LeaseManagerBuilder { + /// Constructs minimal builder instance with all other parameters set to default values. + /// + /// See each builders' method for details about defaults and restrictions. + pub fn new(client: Client, lease_name: impl Into) -> Self { + Self { + client, + lease_name: lease_name.into(), + namespace: String::from("default"), + params: LeaseParams::default(), + create_mode: LeaseCreateMode::default(), + } + } + + /// Builds [`LeaseManager`] from the builder instance using [`LeaseManager::new()`] constructor. + /// + /// This method is async since [`LeaseManager::new()`] is also async, + /// because constructor uses Kubernetes API to ensure the existence of the Lease resource. + /// + /// May return [`LeaseManagerError`] in case of issues with interacting with Kubernetes cluster using provided `client`. + pub async fn build(self) -> Result { + LeaseManager::new( + self.client, + &self.lease_name, + &self.namespace, + self.params, + self.create_mode, + ) + .await + } + + /// Use the specified `Lease` name instead of the one provided to [`LeaseManagerBuilder::new()`]. + pub fn with_lease_name(self, lease_name: impl Into) -> Self { + Self { + lease_name: lease_name.into(), + ..self + } + } + + /// Updates namespace which is used for `Lease` resource. + /// + /// Default value is `"default"`. + pub fn with_namespace(self, namespace: impl Into) -> Self { + Self { + namespace: namespace.into(), + ..self + } + } + + /// Updates [`LeaseCreateMode`] of the manager. + /// + /// The default value is [`AutoCreate`](LeaseCreateMode::AutoCreate). + pub fn with_create_mode(self, create_mode: LeaseCreateMode) -> Self { + Self { create_mode, ..self } + } + + /// Updates whole [`LeaseParams`] instance of the manager. + /// + /// There four additional methods to set each parameters' value individually. + pub fn with_parameters(self, params: LeaseParams) -> Self { + Self { params, ..self } + } + + /// Updates `Lease` identity value. + /// + /// The default value is a random alphanumeric string of [`DEFAULT_RANDOM_IDENTITY_LEN`] length. + pub fn with_identity(self, identity: impl Into) -> Self { + Self { + params: LeaseParams { + identity: identity.into(), + ..self.params + }, + ..self + } + } + + /// Updates `Lease` duration parameter. + /// + /// Default is [30 seconds](DEFAULT_LEASE_DURATION_SECONDS). + /// + /// May panic if duration is less than 0, or if duration is less or equal to the current grace value. + pub fn with_duration(self, duration: DurationSeconds) -> Self { + Self { + params: LeaseParams { + duration, + ..self.params + }, + ..self + } + } + + /// Updates lease grace period parameter. + /// + /// Default is [5 seconds](DEFAULT_LEASE_GRACE_SECONDS). + /// + /// May panic if grace is less than 0, or if grace is greater or equal to the current duration value. + pub fn with_grace(self, grace: DurationSeconds) -> Self { + Self { + params: LeaseParams { grace, ..self.params }, + ..self + } + } + + /// Updates `Lease` field manager value. + /// + /// Default is `identity` string prefixed by `kube-lease-manager-`. + /// + /// Make sure that if you change, it has to be unique among other field managers withing the scope of the `Lease` resource. + pub fn with_field_manager(self, field_manager: impl Into) -> Self { + Self { + params: self.params.with_field_manager(field_manager.into()), + ..self + } + } +} + +impl LeaseManager { + /// Basic constructor of the `LeaseManager` instance. + /// See [`LeaseManagerBuilder`] for another way to make it. + /// + /// Besides constructing of the `LeaseManager,` it ensures `Lease` resource with respect to specified `crate_mode`: + /// creates `Lease` resource or verifies its existence with specified name in the provided namespace. + /// + /// It may return [`LeaseManagerError`] in case of issues during interaction with Kubernetes cluster + /// using provided `client`' + pub async fn new( + client: Client, + lease_name: impl Into, + namespace: impl Into, + params: LeaseParams, + create_mode: LeaseCreateMode, + ) -> Result { + let manager = Self { + params, + state: RwLock::new(LeaseState::new(client, lease_name.into(), &namespace.into(), create_mode).await?), + is_leader: AtomicBool::new(false), + }; + + trace!(manager = ?manager, "constructed new LeaseManager"); + Ok(manager) + } + + /// Spawns a [`Tokio`](https://docs.rs/tokio/latest/tokio/task/fn.spawn.html) task and watch on leader changes permanently. + /// + /// If self-state changes (became a leader or lost the lock), it sends actual lock state to the channel. + /// Exits if the channel is closed (all receivers are gone) or in case of any sending error. + /// + /// See [`more details and examples`](LeaseManager#fully-automated-way) about this method and approach. + pub async fn watch(self) -> (tokio::sync::watch::Receiver, JoinHandle>) { + let (sender, receiver) = tokio::sync::watch::channel(self.is_leader.load(Ordering::Relaxed)); + let watcher = async move { + debug!(params = ?self.params, "starting watch loop"); + + let mut backoff = + BackoffSleep::new(MIN_WATCHER_BACKOFF_TIME, MAX_WATCHER_BACKOFF_TIME, WATCHER_BACKOFF_MULT); + + loop { + select! { + biased; + _ = sender.closed() => { + // Consumer has closed all receivers - release the lock and exit + debug!(identity = %self.params.identity, "control channel has been closed"); + let result = self.release().await; + return match result { + Ok(_) => Ok(self), + Err(err) => Err(err), + }; + } + result = self.changed() => { + debug!(identity = %self.params.identity, "lease state has been changed"); + match result { + Ok(state) => { + let result = sender.send(state); + match result { + Ok(_) => backoff.reset(), + Err(err) => { + let _ = self.release().await; + return Err(LeaseManagerError::ControlChannelError(err)); + } + }; + } + Err(e) => { + error!(error = %e, "LeaseManager watcher error"); + if sender.is_closed() { + let result = self.release().await; + return match result { + Ok(_) => Ok(self), + Err(err) => Err(err), + }; + } else { + backoff.sleep().await; + } + } + } + + } + } + } + }; + + let handler = tokio::spawn(watcher); + + (receiver, handler) + } + + /// Try to lock lease as soon as it's free and renew it periodically to prevent expiration. + /// + /// Returns own status of the leader lock as soon as it was changed (acquired or released). + /// + /// See [`detailed explanation and examples`](LeaseManager#partially-manual-approach). + /// + /// May return [`LeaseManagerError`] in case of issues during interaction with Kubernetes API. + pub async fn changed(&self) -> Result { + let mut backoff = BackoffSleep::new( + MIN_CONFLICT_BACKOFF_TIME, + MAX_CONFLICT_BACKOFF_TIME, + CONFLICT_BACKOFF_MULT, + ); + + loop { + // re-sync state if needed + self.state.write().await.sync(LeaseLockOpts::Soft).await?; + + // Is leader changed iteration? + let is_holder = self.state.read().await.is_holder(&self.params.identity); + if self.is_leader.load(Ordering::Acquire) != is_holder { + debug!(identity = %self.params.identity, is_leader = %is_holder, "lease lock state has been changed"); + self.is_leader.store(is_holder, Ordering::Release); + + return Ok(is_holder); + } + + // Make a single iteration, if no changes so far. + match self.watcher_step().await { + Ok(_) => { + // reset backoff and continue + backoff.reset(); + } + Err(LeaseStateError::LockConflict) => { + // Wait for a backoff interval and continue. + backoff.sleep().await; + } + Err(err) => return Err(LeaseManagerError::from(err)), + } + } + } + + /// Release self-lock if it's set, or do nothing if the lease is locked by another identity. + /// + /// It's useful to call this method to free locked lease gracefully before exit/shutdown + /// if you use [`LeaseManager::changed()`](LeaseManager::changed) + /// directly instead of using [`LeaseManager::watch()`](LeaseManager::watch). + /// + /// It's safe to call it even if this particular manager isn't a leader. + /// + /// May return [`LeaseManagerError`] in case of issues during interaction with Kubernetes API. + pub async fn release(&self) -> Result<()> { + self.state + .write() + .await + .release(&self.params, LeaseLockOpts::Soft) + .await + .map_err(LeaseManagerError::from) + } + + async fn watcher_step(&self) -> Result<(), LeaseStateError> { + if self.is_holder().await { + // if we're the holder of the lock - sleep up to the next refresh time, + // and renew lock (lock it softly) + tokio::time::sleep(self.grace_sleep_duration(self.expiry().await, self.params.grace)).await; + + debug!(identity = %self.params.identity, "renew own lease lock"); + self.state.write().await.lock(&self.params, LeaseLockOpts::Soft).await + } else if !self.is_locked().await { + // Lease isn't locket yet + debug!(identity = %self.params.identity, "try to lock orphaned lease"); + self.state.write().await.lock(&self.params, LeaseLockOpts::Soft).await + } else if self.is_locked().await && self.is_expired().await { + // It's locked by someone else but lock is already expired. + // Release it by force and try to lock on the next loop cycle + debug!(identity = %self.params.identity, "release expired lease lock"); + let res = self + .state + .write() + .await + .release(&self.params, LeaseLockOpts::Force) + .await; + + // Sleep some random time (up to 1000ms) to minimize collision probability + tokio::time::sleep(random_duration(MIN_RELEASE_WAITING_MILLIS, MAX_RELEASE_WAITING_MILLIS)).await; + res + } else if self.is_locked().await && !self.is_expired().await { + // It's locked by someone else and the lock is actual. + // Sleep up to the expiration time of the lock. + let holder = self.holder().await.unwrap(); + debug!(identity = %self.params.identity, %holder,"lease is actually locked by other identity"); + tokio::time::sleep(self.grace_sleep_duration(self.expiry().await, 0)).await; + Ok(()) + } else { + // Something wrong happened + error!(?self, "unreachable branch in LeaseManager watcher, looks like a BUG!"); + Err(LeaseStateError::InconsistentState( + "unreachable branch, looks like a BUG!".into(), + )) + } + } + + async fn is_expired(&self) -> bool { + self.state.read().await.is_expired() + } + + async fn is_locked(&self) -> bool { + self.state.read().await.is_locked() + } + + async fn is_holder(&self) -> bool { + self.state.read().await.is_holder(&self.params.identity) + } + + async fn expiry(&self) -> SystemTime { + self.state.read().await.expiry + } + + async fn holder(&self) -> Option { + self.state.read().await.holder.clone() + } + + fn grace_sleep_duration(&self, expiry: SystemTime, grace: DurationSeconds) -> Duration { + let grace = Duration::from_secs(grace); + expiry + .duration_since(SystemTime::now()) + .unwrap_or(Duration::ZERO) + .saturating_sub(grace) + } +} + +fn random_duration(min_millis: DurationMillis, max_millis: DurationMillis) -> Duration { + Duration::from_millis(thread_rng().gen_range(min_millis..max_millis)) +} + +fn random_string(len: usize) -> String { + let rand: String = thread_rng() + .sample_iter(&Alphanumeric) + .take(len) + .map(char::from) + .collect(); + rand +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::future::select_all; + use k8s_openapi::api::{coordination::v1::Lease, core::v1::Namespace}; + use kube::{ + api::{DeleteParams, PostParams}, + Api, Resource as _, + }; + use std::{collections::HashSet, sync::OnceLock}; + use tokio::{join, select, sync::OnceCell}; + + pub(crate) const TEST_NAMESPACE: &str = "kube-lease-test"; + + static INITIALIZED: OnceCell = OnceCell::const_new(); + static TRACING: OnceLock<()> = OnceLock::new(); + + pub(crate) async fn init() -> Client { + let client = Client::try_default().await.unwrap(); + INITIALIZED + .get_or_init(|| async { + create_namespace(client.clone()).await; + init_tracing(); + true + }) + .await; + + client + } + + pub(crate) fn init_tracing() { + TRACING.get_or_init(|| { + tracing_subscriber::fmt::init(); + }); + } + + /// Unattended namespace creation + async fn create_namespace(client: Client) { + let api = Api::::all(client); + let pp = PostParams::default(); + + let mut data = Namespace::default(); + data.meta_mut().name = Some(String::from(TEST_NAMESPACE)); + + api.create(&pp, &data).await.unwrap_or_default(); + } + + async fn setup_simple_managers_vec(lease_name: &str, count: usize) -> Vec { + const LEASE_DURATION_SECONDS: DurationSeconds = 2; + const LEASE_GRACE_SECONDS: DurationSeconds = 1; + + let client = init().await; + let mut managers = vec![]; + + for i in 0..count { + let param = LeaseParams::new(format!("leader-{i}"), LEASE_DURATION_SECONDS, LEASE_GRACE_SECONDS); + let manager = LeaseManager::new( + client.clone(), + lease_name, + TEST_NAMESPACE, + param, + LeaseCreateMode::Ignore, + ) + .await + .unwrap(); + managers.push(manager); + } + + // Create lease + let _ = managers[0] + .state + .read() + .await + .create(LeaseCreateMode::CreateNew) + .await + .unwrap(); + managers + } + + pub(crate) async fn sleep_secs(seconds: DurationSeconds) { + tokio::time::sleep(Duration::from_secs(seconds)).await + } + + #[test] + fn lease_params_default_constructor() { + let params = LeaseParams::default(); + assert_eq!(params.identity.len(), DEFAULT_RANDOM_IDENTITY_LEN); + assert_eq!(params.duration, DEFAULT_LEASE_DURATION_SECONDS); + assert_eq!(params.grace, DEFAULT_LEASE_GRACE_SECONDS); + } + + #[test] + #[should_panic = "duration and grace period should be greater than zero"] + fn incorrect_lease_params_duration_0() { + let _params = LeaseParams::new(random_string(10), 0, 0); + } + + #[test] + #[should_panic = "duration and grace period should be greater than zero"] + fn incorrect_lease_params_grace_0() { + let _params = LeaseParams::new(random_string(10), 2, 0); + } + + #[test] + #[should_panic = "grace period should be less than lease lock duration"] + fn incorrect_lease_params_duration_equal_grace() { + let _params = LeaseParams::new(random_string(10), 2, 2); + } + + #[test] + #[should_panic = "grace period should be less than lease lock duration"] + fn incorrect_lease_params_duration_less_than_grace() { + let _params = LeaseParams::new(random_string(10), 2, 3); + } + + #[test] + fn random_100_000_8ch_strings() { + const SET_LEN: usize = 100_000; + + let mut set = HashSet::new(); + for _ in 0..SET_LEN { + set.insert(random_string(8)); + } + + assert_eq!(set.len(), SET_LEN); + } + + #[test] + fn random_100_intervals() { + const SET_LEN: usize = 100; + + let mut set = HashSet::new(); + for _ in 0..SET_LEN { + set.insert(random_duration(MIN_RELEASE_WAITING_MILLIS, MAX_RELEASE_WAITING_MILLIS)); + } + + assert!( + set.len() >= SET_LEN * 8 / 10, + "at least 80% of randoms should be unique, but got {}%", + set.len() + ); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn grace_sleep_duration() { + let client = Client::try_default().await.unwrap(); + let manager = LeaseManager::new( + client, + "lease_name", + "namespace", + LeaseParams::default(), + LeaseCreateMode::Ignore, + ) + .await + .unwrap(); + + // Already expired - always ZERO + let expiry = SystemTime::now().checked_sub(Duration::from_nanos(1)).unwrap(); + manager.state.write().await.expiry = expiry; + assert_eq!( + manager.grace_sleep_duration(expiry, 0), + Duration::ZERO, + "should be ZERO since it's already expired" + ); + assert_eq!( + manager.grace_sleep_duration(expiry, 1), + Duration::ZERO, + "should be ZERO since it's already expired" + ); + + // Expires in 10s + let expiry = SystemTime::now().checked_add(Duration::from_secs(10)).unwrap(); + manager.state.write().await.expiry = expiry; + let duration = manager.grace_sleep_duration(expiry, 0); + assert!( + duration >= Duration::from_millis(9_900) && duration <= Duration::from_millis(10_000), + "should be around 10s" + ); + let duration = manager.grace_sleep_duration(expiry, 1); + assert!( + duration >= Duration::from_millis(8_900) && duration <= Duration::from_millis(9_000), + "should be around 9s" + ); + assert_eq!( + manager.grace_sleep_duration(expiry, 10), + Duration::ZERO, + "should be around ZERO" + ); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn single_manager_watcher_step() { + const LEASE_NAME: &str = "single-manager-watcher-step-test"; + + let managers = setup_simple_managers_vec(LEASE_NAME, 1).await; + assert!(!managers[0].is_leader.load(Ordering::Relaxed)); + assert!(!managers[0].state.read().await.is_holder(&managers[0].params.identity)); + assert!(!managers[0].state.read().await.is_locked()); + + managers[0].watcher_step().await.unwrap(); + assert!(!managers[0].is_leader.load(Ordering::Relaxed)); + assert!(managers[0].state.read().await.is_holder(&managers[0].params.identity)); + assert!(managers[0].state.read().await.is_locked()); + assert!(!managers[0].state.read().await.is_expired()); + + // Expire + sleep_secs(managers[0].params.duration).await; + assert!(!managers[0].is_leader.load(Ordering::Relaxed)); + assert!(managers[0].state.read().await.is_holder(&managers[0].params.identity)); + assert!(managers[0].state.read().await.is_locked()); + assert!(managers[0].state.read().await.is_expired()); + + // Clean up + managers[0].state.read().await.delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn single_manager_changed_loop() { + const LEASE_NAME: &str = "single-manager-changed-loop-test"; + + let managers = setup_simple_managers_vec(LEASE_NAME, 1).await; + assert!(!managers[0].is_leader.load(Ordering::Relaxed)); + + let is_leader = managers[0].changed().await.unwrap(); + assert!(is_leader); + assert!(managers[0].is_leader.load(Ordering::Relaxed)); + + let long_duration = Duration::from_secs(managers[0].params.duration * 2); + select! { + _ = managers[0].changed() => { + unreachable!("unreachable branch since `changed` loop should last forever") + }, + _ = tokio::time::sleep(long_duration) => { + assert!(managers[0].is_leader.load(Ordering::Relaxed)) + } + } + assert!(managers[0].is_leader.load(Ordering::Relaxed)); + + // Clean up + managers[0].state.read().await.delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn two_managers_1st_expires_then_2nd_locks() { + const LEASE_NAME: &str = "two-managers-1st-expires-then-2nd-locks-test"; + + let mut managers = setup_simple_managers_vec(LEASE_NAME, 2).await; + let manager0 = managers.pop().unwrap(); + let manager1 = managers.pop().unwrap(); + + assert!(!manager0.is_leader.load(Ordering::Relaxed)); + assert!(!manager1.is_leader.load(Ordering::Relaxed)); + + // Lock by 1st + let is_leader = manager0.changed().await.unwrap(); + assert!(is_leader); + assert!(manager0.is_leader.load(Ordering::Relaxed)); + assert!(!manager1.is_leader.load(Ordering::Relaxed)); + + // Try to hold lock by 1st + let long_duration = Duration::from_secs(manager0.params.duration * 2); + select! { + biased; + _ = manager0.changed() => { + unreachable!("unreachable branch since `changed` loop should last forever") + }, + _ = manager1.changed() => { + unreachable!("unreachable branch since `changed` loop should last forever") + }, + _ = tokio::time::sleep(long_duration) => { + assert!(manager0.is_leader.load(Ordering::Relaxed)); + assert!(!manager1.is_leader.load(Ordering::Relaxed)); + } + } + assert!(manager0.is_leader.load(Ordering::Relaxed)); + assert!(!manager1.is_leader.load(Ordering::Relaxed)); + + // Expire and try to re-lock by 2nd + sleep_secs(manager0.params.duration).await; + let is_leader = manager1.changed().await.unwrap(); + assert!(is_leader); + assert!(manager1.is_leader.load(Ordering::Relaxed)); + + let is_leader = manager0.changed().await.unwrap(); + assert!(!is_leader); + assert!(!manager0.is_leader.load(Ordering::Relaxed)); + + // Clean up + manager0.state.read().await.delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn many_managers_1st_expires_then_someone_locks() { + const LEASE_NAME: &str = "many-managers-1st-expires-then-someone-locks-test"; + const MANAGERS: usize = 100; + + let mut managers = setup_simple_managers_vec(LEASE_NAME, MANAGERS).await; + + for manager in &managers { + assert!(!manager.is_leader.load(Ordering::Relaxed)); + } + + // Lock by 1st + let is_leader = managers[0].changed().await.unwrap(); + assert!(is_leader); + for (i, manager) in managers.iter().enumerate() { + assert_eq!( + i == 0, + manager.is_leader.load(Ordering::Relaxed), + "locked by incorrect manager" + ); + } + + // Try to hold lock by 1st + let long_duration = Duration::from_secs(managers[0].params.duration * 2); + { + let managers_fut: Vec<_> = managers + .iter_mut() + .map(|m| { + let c = async { + let r = m.changed().await; + tokio::time::sleep(random_duration(0, 500)).await; + r + }; + Box::pin(c) + }) + .collect(); + select! { + _ = tokio::time::sleep(long_duration) => {}, + _ = select_all(managers_fut) => { + unreachable!("unreachable branch since `changed` loop should last forever") + } + } + for (i, manager) in managers.iter().enumerate() { + assert_eq!(i == 0, manager.is_leader.load(Ordering::Relaxed)); + } + } + // Expire it + tokio::time::sleep( + Duration::from_secs(managers[0].params.duration) + .checked_add(Duration::from_millis(100)) + .unwrap(), + ) + .await; + // Try to re-lock by someone else (exclude 1st from loop) + { + let managers_fut: Vec<_> = managers + .iter_mut() + .skip(1) + .map(|m| { + let c = async { + let r = m.changed().await; + tokio::time::sleep(random_duration(0, 500)).await; + r + }; + Box::pin(c) + }) + .collect(); + let (result, index, _) = select_all(managers_fut).await; + assert!(result.unwrap()); + // Assert that only one holds lock + for (i, manager) in managers.iter().skip(1).enumerate() { + assert_eq!( + i == index, + manager.is_leader.load(Ordering::Relaxed), + "locked by incorrect manager" + ); + } + // Assert that 1st lost lock + assert!(!managers[0].changed().await.unwrap()); + } + + // Clean up + managers[0].state.read().await.delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn create_lease() { + const LEASE_NAME: &str = "create-lease-test"; + + let client = init().await; + let dp = DeleteParams::default(); + let api = Api::::namespaced(client.clone(), TEST_NAMESPACE); + + let _ = api.delete(LEASE_NAME, &dp).await; + + assert!( + matches!( + LeaseState::new(client.clone(), LEASE_NAME, TEST_NAMESPACE, LeaseCreateMode::UseExistent).await, + Err(LeaseStateError::NonexistentLease(_)) + ), + "lease exists but shouldn't" + ); + + assert!( + LeaseState::new(client.clone(), LEASE_NAME, TEST_NAMESPACE, LeaseCreateMode::AutoCreate) + .await + .is_ok(), + "lease wasn't created" + ); + + assert!( + LeaseState::new(client.clone(), LEASE_NAME, TEST_NAMESPACE, LeaseCreateMode::UseExistent) + .await + .is_ok(), + "lease wasn't created" + ); + + assert!( + LeaseState::new(client.clone(), LEASE_NAME, TEST_NAMESPACE, LeaseCreateMode::AutoCreate) + .await + .is_ok(), + "lease wasn't created" + ); + + assert!( + matches!( + LeaseState::new(client.clone(), LEASE_NAME, TEST_NAMESPACE, LeaseCreateMode::CreateNew).await, + Err(LeaseStateError::LeaseAlreadyExists(_)) + ), + "lease should exist" + ); + + api.delete(LEASE_NAME, &dp).await.unwrap(); + + assert!( + LeaseState::new(client.clone(), LEASE_NAME, TEST_NAMESPACE, LeaseCreateMode::CreateNew) + .await + .is_ok(), + "lease shouldn't exist" + ); + + api.delete(LEASE_NAME, &dp).await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn two_managers_1st_releases_then_2nd_locks() { + const LEASE_NAME: &str = "two-managers-1st-releases-then-2nd-locks-test"; + + let mut managers = setup_simple_managers_vec(LEASE_NAME, 2).await; + let manager0 = managers.pop().unwrap(); + let manager1 = managers.pop().unwrap(); + + assert!(!manager0.is_leader.load(Ordering::Relaxed)); + assert!(!manager1.is_leader.load(Ordering::Relaxed)); + + // Lock by 1st + let is_leader = manager0.changed().await.unwrap(); + assert!(is_leader); + assert!(manager0.is_leader.load(Ordering::Relaxed)); + assert!(!manager1.is_leader.load(Ordering::Relaxed)); + + // Try to hold lock by 1st + let long_duration = Duration::from_secs(manager0.params.duration * 2); + select! { + biased; + _ = manager0.changed() => { + unreachable!("unreachable branch since `changed` loop should last forever") + }, + _ = manager1.changed() => { + unreachable!("unreachable branch since `changed` loop should last forever") + }, + _ = tokio::time::sleep(long_duration) => { + assert!(manager0.is_leader.load(Ordering::Relaxed)); + assert!(!manager1.is_leader.load(Ordering::Relaxed)); + } + } + assert!(manager0.is_leader.load(Ordering::Relaxed)); + assert!(!manager1.is_leader.load(Ordering::Relaxed)); + + // Release and try to re-lock by 2nd + manager0.release().await.unwrap(); + let is_leader = manager1.changed().await.unwrap(); + assert!(is_leader); + assert!(manager1.is_leader.load(Ordering::Relaxed)); + + let is_leader = manager0.changed().await.unwrap(); + assert!(!is_leader); + assert!(!manager0.is_leader.load(Ordering::Relaxed)); + + // Clean up + manager0.state.read().await.delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn single_watch_managers_handles_own_channel() { + const LEASE_NAME: &str = "single-watch-managers-handle-channel-test"; + + let mut managers = setup_simple_managers_vec(LEASE_NAME, 1).await; + let manager0 = managers.pop().unwrap(); + + assert!(!manager0.is_leader.load(Ordering::Relaxed)); + + // Run watcher + let (mut channel, handler) = manager0.watch().await; + // It had no time to lock + assert!(!*channel.borrow_and_update()); + + // Wait to lock lease + channel.changed().await.unwrap(); + assert!(*channel.borrow_and_update()); + + // Try to hold lock for 3 seconds + select! { + _ = channel.changed() => { + unreachable!("unreachable branch since lock state has not be changed") + }, + _ = sleep_secs(3) => { + assert!(*channel.borrow()); + } + } + assert!(*channel.borrow()); + + // Close the control channel and expect released lock and finished watcher + drop(channel); + let manager0 = join!(handler).0.unwrap().unwrap(); + manager0.state.write().await.sync(LeaseLockOpts::Force).await.unwrap(); + assert!(!manager0.is_holder().await); + + // Clean up + manager0.state.read().await.delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn two_managers_1st_uses_changed_2nd_watch() { + const LEASE_NAME: &str = "two-managers-1st-uses-changed-2nd-watch-test"; + + let mut managers = setup_simple_managers_vec(LEASE_NAME, 2).await; + let manager0 = managers.remove(0); + let manager1 = managers.remove(0); + + assert!(!manager0.is_leader.load(Ordering::Relaxed)); + assert!(!manager1.is_leader.load(Ordering::Relaxed)); + + // Lock by 1st + let is_leader = manager0.changed().await.unwrap(); + assert!(is_leader); + assert!(manager0.is_leader.load(Ordering::Relaxed)); + assert!(!manager1.is_leader.load(Ordering::Relaxed)); + + let (mut channel, handler) = manager1.watch().await; + assert!(!*channel.borrow_and_update()); + + // Hold lock by 1st for 3 seconds + select! { + _ = sleep_secs(3) => { + assert!(manager0.is_leader.load(Ordering::Relaxed)); + assert!(!*channel.borrow_and_update()); + } + _changed = manager0.changed() => { + unreachable!("unreachable branch since lock state has not be changed"); + } + _watch = channel.changed() => { + unreachable!("unreachable branch since lock state has not be changed"); + } + } + assert!(manager0.is_leader.load(Ordering::Relaxed)); + assert!(!*channel.borrow_and_update()); + + // Don't touch 1st to make it expired, 2nd has to lock it + select! { + _ = sleep_secs(4) => { + unreachable!("unreachable branch since lock state has not be changed"); + + } + _watch = channel.changed() => { + assert!(*channel.borrow_and_update()); + manager0.changed().await.unwrap(); + assert!(!manager0.is_leader.load(Ordering::Relaxed)); + } + } + + // Drop channel to release 2nd and wait for 1st is changed + drop(channel); + assert!(manager0.changed().await.unwrap()); + // And 2nd has to finish + let res = join!(handler).0.unwrap().unwrap(); + assert!(!res.is_holder().await); + + // Clean up + manager0.state.read().await.delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn many_managers_watch_one_by_one() { + const LEASE_NAME: &str = "many-managers-watch-one-by-one-test"; + const NUMBER_OF_MANAGERS: usize = 10; + + let managers = setup_simple_managers_vec(LEASE_NAME, NUMBER_OF_MANAGERS).await; + + // ensure there is no holders + assert_eq!( + managers.iter().filter(|m| m.is_leader.load(Ordering::Relaxed)).count(), + 0 + ); + + // run watchers + let mut handlers = vec![]; + let mut channels = vec![]; + for m in managers { + let w = m.watch().await; + channels.push(w.0); + handlers.push(w.1); + } + // wait for at least one locked lease + // assert that at least one watcher get changed, and only one holds lock + sleep_secs(3).await; + let changed_vec: Vec<_> = channels.iter_mut().map(|ch| Box::pin(ch.changed())).collect(); + let (_result, index, _) = select_all(changed_vec).await; + assert!(*channels[index].borrow_and_update()); + assert_eq!( + channels + .iter_mut() + .map(|ch| *ch.borrow_and_update()) + .filter(|r| *r) + .count(), + 1 + ); + + // drop watchers one by one and assert that single lock only + let mut prev_index = index; + let mut managers = vec![]; + while !channels.is_empty() { + let prev_channel = channels.remove(prev_index); + let prev_handler = handlers.remove(prev_index); + drop(prev_channel); + let manager = join!(prev_handler).0.unwrap().unwrap(); + managers.push(manager); + + if !channels.is_empty() { + // wait for new lock + let changed_vec: Vec<_> = channels.iter_mut().map(|ch| Box::pin(ch.changed())).collect(); + let (_result, index, _) = select_all(changed_vec).await; + assert!(*channels[index].borrow_and_update()); + assert_eq!( + channels + .iter_mut() + .map(|ch| *ch.borrow_and_update()) + .filter(|r| *r) + .count(), + 1 + ); + + prev_index = index; + } + } + + // assert the expected number of the lease transitions + assert_eq!( + managers[0] + .state + .read() + .await + .get() + .await + .unwrap() + .spec + .unwrap() + .lease_transitions + .unwrap(), + NUMBER_OF_MANAGERS as i32 + ); + + // Clean up + managers[0].state.read().await.delete().await.unwrap(); + } + + #[test] + fn lease_params_constructor_with_field_manager() { + const CUSTOM_FIELD_MANAGER: &str = "custom-field-manager"; + + let fm = String::from(CUSTOM_FIELD_MANAGER); + + let params = LeaseParams::default(); + assert_ne!(params.field_manager(), CUSTOM_FIELD_MANAGER); + + let params = params.with_field_manager(fm); + assert_eq!(params.field_manager(), CUSTOM_FIELD_MANAGER); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn lease_manager_builder() { + const LEASE_NAME0: &str = "lease-manager-builder-test-0"; + const LEASE_NAME: &str = "lease-manager-builder-test"; + const CUSTOM_FIELD_MANAGER0: &str = "custom-field-manager-0"; + const CUSTOM_FIELD_MANAGER: &str = "custom-field-manager"; + const CUSTOM_IDENTITY0: &str = "custom-lease-manager-identity-0"; + const CUSTOM_IDENTITY: &str = "custom-lease-manager-identity"; + + let client = init().await; + let builder = LeaseManagerBuilder::new(client, LEASE_NAME0); + + assert_eq!(builder.lease_name, LEASE_NAME0); + let builder = builder.with_lease_name(LEASE_NAME); + assert_eq!(builder.lease_name, LEASE_NAME); + + assert_eq!(builder.namespace, "default"); + let builder = builder.with_namespace(TEST_NAMESPACE); + assert_eq!(builder.namespace, TEST_NAMESPACE); + + assert_eq!(builder.create_mode, LeaseCreateMode::default()); + assert_ne!(LeaseCreateMode::default(), LeaseCreateMode::CreateNew); + let builder = builder.with_create_mode(LeaseCreateMode::CreateNew); + assert_eq!(builder.create_mode, LeaseCreateMode::CreateNew); + + assert_ne!(builder.params.identity, CUSTOM_IDENTITY0); + let builder = builder.with_identity(CUSTOM_IDENTITY0); + assert_eq!(builder.params.identity, CUSTOM_IDENTITY0); + + assert_ne!(builder.params.field_manager, CUSTOM_FIELD_MANAGER0); + let builder = builder.with_field_manager(CUSTOM_FIELD_MANAGER0); + assert_eq!(builder.params.field_manager(), CUSTOM_FIELD_MANAGER0); + + assert_ne!(builder.params.duration, 111); + assert_eq!(builder.params.duration, LeaseParams::default().duration); + let builder = builder.with_duration(111); + assert_eq!(builder.params.duration, 111); + + assert_ne!(builder.params.grace, 11); + assert_eq!(builder.params.grace, LeaseParams::default().grace); + let builder = builder.with_grace(11); + assert_eq!(builder.params.grace, 11); + + let params = LeaseParams { + identity: CUSTOM_IDENTITY.into(), + duration: 222, + grace: 22, + field_manager: CUSTOM_FIELD_MANAGER.into(), + }; + let builder = builder.with_parameters(params); + assert_eq!(builder.params.identity, CUSTOM_IDENTITY); + assert_eq!(builder.params.duration, 222); + assert_eq!(builder.params.grace, 22); + assert_eq!(builder.params.field_manager(), CUSTOM_FIELD_MANAGER); + + let manager = builder.build().await.unwrap(); + manager.changed().await.unwrap(); + + // Clean up + manager.state.read().await.delete().await.unwrap(); + } +} diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..c07314b --- /dev/null +++ b/src/state.rs @@ -0,0 +1,686 @@ +use crate::{DurationSeconds, LeaseCreateMode, LeaseParams, Result}; +use k8s_openapi::{ + api::coordination::v1::Lease, + apimachinery::pkg::apis::meta::v1::MicroTime, + chrono::{DateTime, Utc}, + serde::Serialize, + serde_json, +}; +use kube::{ + api::{ObjectMeta, Patch, PatchParams, PostParams}, + Api, Client, +}; +use std::{ + fmt::Debug, + time::{Duration, SystemTime}, +}; +use tracing::{debug, trace}; + +/// Represents actual Lease state. +#[derive(Debug)] +pub(crate) struct LeaseState { + /// namespaced kube::Api + api: Api, + /// Name of the Lease object. + lease_name: String, + /// Identity of the current Lease holder, if it's locked now. + pub(crate) holder: Option, + /// Time of the potential state expiration. + pub(crate) expiry: SystemTime, + /// Transitions count. + transitions: i32, +} + +/// Represents `kube-lease` specific errors. +#[derive(thiserror::Error, Debug)] +pub enum LeaseStateError { + /// Error originated from the Kubernetes. + #[error("Kube error: {0}")] + KubeError( + #[source] + #[from] + kube::Error, + ), + + /// Conflict detected during locking attempt. + #[error("lock conflict detected")] + LockConflict, + + /// Try to create a new Lease, but it already exists. + #[error("Lease resource `{0}` already exists")] + LeaseAlreadyExists(String), + + /// Try to use non-existent Lease resource. + #[error("Lease resource `{0}` doesn't exist")] + NonexistentLease(String), + + /// Internal lease state inconsistency detected. + /// + /// Usually the root cause is a bug. + #[error("inconsistent LeaseManager state detected: {0}")] + InconsistentState(String), +} + +/// Options to use for operations with lock. +#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) enum LeaseLockOpts { + #[default] + Soft, + Force, +} + +impl LeaseState { + /// Constructor + pub(crate) async fn new( + client: Client, + lease_name: impl Into, + namespace: &str, + create_mode: LeaseCreateMode, + ) -> Result { + let api = Api::::namespaced(client, namespace); + + let state = Self { + api, + lease_name: lease_name.into(), + holder: None, + expiry: SystemTime::now().checked_sub(Duration::from_nanos(1)).unwrap(), + transitions: 0, + }; + + state.create(create_mode).await?; + Ok(state) + } + + /// Check if the current state is still valid. + pub(crate) fn is_expired(&self) -> bool { + SystemTime::now() > self.expiry + } + + /// Check if the current holder is the same as `other` parameter. + pub(crate) fn is_holder(&self, other: &str) -> bool { + if let Some(holder) = &self.holder { + holder == other + } else { + false + } + } + + /// Check if the current holder is set to something. + pub(crate) fn is_locked(&self) -> bool { + self.holder.is_some() + } + + /// Retrieve the actual state from the cluster. + pub(crate) async fn sync(&mut self, opts: LeaseLockOpts) -> Result<(), LeaseStateError> { + if opts == LeaseLockOpts::Force || self.is_expired() { + debug!(?opts, lease = %self.lease_name, "sync lease state"); + let result = self.get().await; + + // If the Lease doesn't exist - clear state before exiting + if let Err(LeaseStateError::NonexistentLease(_)) = &result { + debug!(lease = %self.lease_name, "erasing state because lease doesn't exists"); + self.holder = None; + self.transitions = 0; + self.expiry = SystemTime::now().checked_sub(Duration::from_nanos(1)).unwrap(); + + return Err(result.err().unwrap()); + } + + // If success or non-404 - try to unwrap spec and do sync + let result = result?; + if let Some(lease) = result.spec { + self.holder = lease.holder_identity; + self.transitions = lease.lease_transitions.unwrap_or(0); + self.expiry = { + let renew = lease.renew_time; + let duration = lease + .lease_duration_seconds + .map(|d| Duration::from_secs(d as DurationSeconds)); + + if renew.is_some() && duration.is_some() { + let renew: SystemTime = renew.unwrap().0.into(); + let duration: Duration = duration.unwrap(); + + renew.checked_add(duration).unwrap() + } else { + SystemTime::now().checked_sub(Duration::from_nanos(1)).unwrap() + } + }; + } else { + // Empty spec in the Lease + debug!(lease = %self.lease_name, "lease `spec` field is empty"); + self.holder = None; + self.transitions = 0; + self.expiry = SystemTime::now().checked_sub(Duration::from_nanos(1)).unwrap(); + } + } + + Ok(()) + } + + pub(crate) async fn lock(&mut self, params: &LeaseParams, opts: LeaseLockOpts) -> Result<(), LeaseStateError> { + self.sync(LeaseLockOpts::Soft).await?; + + let lease_duration_seconds = params.duration; + let now: DateTime = SystemTime::now().into(); + + // if we're the holder - refresh the lease + let patch = if self.is_holder(¶ms.identity) { + // if we're the holder - refresh the lease + trace!("update our own lease"); + let patch = serde_json::json!({ + "apiVersion": "coordination.k8s.io/v1", + "kind": "Lease", + "spec": { + "renewTime": MicroTime(now), + "leaseDurationSeconds": lease_duration_seconds, + }, + }); + Patch::Strategic(patch) + } else if !self.is_locked() { + // if the lock is orphaned - try to lock it softly + trace!("try to lock orphaned lease"); + let patch = serde_json::json!({ + "apiVersion": "coordination.k8s.io/v1", + "kind": "Lease", + "spec": { + "acquireTime": MicroTime(now), + "renewTime": MicroTime(now), + "holderIdentity": params.identity, + "leaseDurationSeconds": lease_duration_seconds, + }, + }); + let patch = Patch::Apply(patch); + self.patch(params, &patch).await?; + + trace!("locked successfully, increase transitions counter"); + let patch = serde_json::json!({ + "apiVersion": "coordination.k8s.io/v1", + "kind": "Lease", + "spec": { + "leaseTransitions": self.transitions + 1, + }, + }); + Patch::Strategic(patch) + } else if opts == LeaseLockOpts::Force { + // if it's locked by someone else but force is requested - try to lock it with force + trace!("try to force re-lock locked lease"); + let patch = serde_json::json!({ + "apiVersion": "coordination.k8s.io/v1", + "kind": "Lease", + "spec": { + "acquireTime": MicroTime(now), + "renewTime": MicroTime(now), + "holderIdentity": params.identity, + "leaseDurationSeconds": lease_duration_seconds, + "leaseTransitions": self.transitions + 1, + }, + }); + Patch::Strategic(patch) + } else { + return Ok(()); + }; + + self.patch(params, &patch).await?; + self.sync(LeaseLockOpts::Force).await + } + + pub(crate) async fn release(&mut self, params: &LeaseParams, opts: LeaseLockOpts) -> Result<(), LeaseStateError> { + self.sync(LeaseLockOpts::Soft).await?; + + if self.is_holder(¶ms.identity) || self.is_expired() || opts == LeaseLockOpts::Force { + debug!(?params, ?opts, "release lock"); + + let patch = serde_json::json!({ + "apiVersion": "coordination.k8s.io/v1", + "kind": "Lease", + "spec": { + "acquireTime": Option::<()>::None, + "renewTime": Option::<()>::None, + "holderIdentity": Option::<()>::None, + "leaseDurationSeconds": Option::<()>::None, + } + }); + + let patch = Patch::Strategic(patch); + self.patch(params, &patch).await?; + } + + self.sync(LeaseLockOpts::Force).await + } + + pub(crate) async fn patch

(&self, params: &LeaseParams, patch: &Patch

) -> Result<(), LeaseStateError> + where + P: Serialize + Debug, + { + debug!(?patch, "patch lease"); + + let params = PatchParams { + field_manager: Some(params.field_manager()), + // force: matches!(patch, Patch::Apply(_)), + ..Default::default() + }; + + match self.api.patch(&self.lease_name, ¶ms, patch).await { + Ok(_) => Ok(()), + Err(kube::Error::Api(err)) if err.reason == "Conflict" && err.code == 409 => { + debug!(error = ?err, "conflict detected while patching"); + Err(LeaseStateError::LockConflict) + } + Err(err) => Err(LeaseStateError::KubeError(err)), + } + } + + pub(crate) async fn get(&self) -> Result { + let result = self.api.get(&self.lease_name).await; + + // Map error is it doesn't exists + match result { + Ok(lease) => Ok(lease), + Err(kube::Error::Api(err)) if err.code == 404 => { + Err(LeaseStateError::NonexistentLease(self.lease_name.clone())) + } + Err(err) => Err(LeaseStateError::KubeError(err)), + } + } + + pub(crate) async fn create(&self, mode: LeaseCreateMode) -> Result { + let result = self.get().await; + let pp = PostParams::default(); + let data = Lease { + metadata: ObjectMeta { + name: Some(self.lease_name.clone()), + ..Default::default() + }, + spec: Default::default(), + }; + + debug!(?mode, "create lease"); + match mode { + LeaseCreateMode::AutoCreate => { + // Get it and return ok if it exists, + // Create if it doesn't exist + if let Ok(lease) = result { + Ok(lease) + } else if let Err(LeaseStateError::NonexistentLease(_)) = result { + self.api.create(&pp, &data).await.map_err(LeaseStateError::from) + } else { + result + } + } + LeaseCreateMode::CreateNew => { + // Get it and fail if it exists, + // Create if else + if result.is_ok() { + Err(LeaseStateError::LeaseAlreadyExists(self.lease_name.clone())) + } else { + self.api.create(&pp, &data).await.map_err(LeaseStateError::from) + } + } + LeaseCreateMode::UseExistent => { + // Get it and fail if it doesn't exist + result + } + #[cfg(test)] + LeaseCreateMode::Ignore => Ok(data), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::{init, sleep_secs, TEST_NAMESPACE}; + use k8s_openapi::api::coordination::v1::LeaseSpec; + use kube::api::DeleteParams; + + async fn setup_simple_leaders_vec(lease_name: &str, count: usize) -> (Vec, Vec) { + const LEASE_DURATION_SECONDS: DurationSeconds = 2; + const LEASE_GRACE_SECONDS: DurationSeconds = 1; + + let client = init().await; + let mut params = vec![]; + let mut states = vec![]; + + for i in 0..count { + let param = LeaseParams::new(format!("leader-{i}"), LEASE_DURATION_SECONDS, LEASE_GRACE_SECONDS); + let state = LeaseState::new(client.clone(), lease_name, TEST_NAMESPACE, LeaseCreateMode::Ignore) + .await + .unwrap(); + + params.push(param); + states.push(state); + } + + // Create lease + let _ = states[0].create(LeaseCreateMode::CreateNew).await.unwrap(); + + (params, states) + } + + impl LeaseState { + pub(crate) async fn delete(&self) -> Result<()> { + let dp = DeleteParams::default(); + let _ = self.api.delete(&self.lease_name, &dp).await?; + + Ok(()) + } + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn rough_create_delete() { + const LEASE_NAME: &str = "rough-create-delete-test"; + + let client = init().await; + let state = LeaseState::new(client, LEASE_NAME, TEST_NAMESPACE, LeaseCreateMode::Ignore) + .await + .unwrap(); + + let lease = state.create(LeaseCreateMode::CreateNew).await.unwrap(); + assert!(lease.spec.is_some()); + assert_eq!(lease.spec.unwrap(), LeaseSpec::default()); + + state.delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn simple_soft_lock_soft_release() { + const LEASE_NAME: &str = "simple-soft-lock-soft-release-test"; + let (params, mut states) = setup_simple_leaders_vec(LEASE_NAME, 1).await; + + // Lock + states[0].lock(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(!states[0].is_expired()); + + // Expire + sleep_secs(params[0].duration).await; + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(states[0].is_expired()); + + // Release + states[0].release(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + assert!(!states[0].is_locked()); + assert!(!states[0].is_holder(¶ms[0].identity)); + assert!(states[0].is_expired()); + + states[0].delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn soft_lock_1st_soft_release_2nd() { + const LEASE_NAME: &str = "soft-lock-1st-soft-release-2nd-test"; + let (params, mut states) = setup_simple_leaders_vec(LEASE_NAME, 2).await; + + // Lock by 1st + states[0].lock(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + states[1].sync(LeaseLockOpts::Force).await.unwrap(); + + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(!states[0].is_expired()); + + assert!(states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(states[1].is_holder(¶ms[0].identity)); + assert!(!states[1].is_expired()); + + // Try to release by 2nd, unsuccessfully + states[1].release(¶ms[1], LeaseLockOpts::Soft).await.unwrap(); + states[0].sync(LeaseLockOpts::Force).await.unwrap(); + + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(!states[0].is_expired()); + + assert!(states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(states[1].is_holder(¶ms[0].identity)); + assert!(!states[1].is_expired()); + + // Expire + sleep_secs(params[0].duration).await; + assert!(states[0].is_expired()); + assert!(states[1].is_expired()); + + // Try to release by 2nd, successfully + states[1].release(¶ms[1], LeaseLockOpts::Soft).await.unwrap(); + states[0].sync(LeaseLockOpts::Force).await.unwrap(); + + assert!(!states[0].is_locked()); + assert!(!states[0].is_holder(¶ms[0].identity)); + assert!(states[0].is_expired()); + + assert!(!states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(!states[1].is_holder(¶ms[0].identity)); + assert!(states[1].is_expired()); + + states[0].delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn soft_lock_1st_force_release_2nd() { + const LEASE_NAME: &str = "soft-lock-1st-force-release-2nd-test"; + let (params, mut states) = setup_simple_leaders_vec(LEASE_NAME, 2).await; + + // Lock by 1st + states[0].lock(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + states[1].sync(LeaseLockOpts::Force).await.unwrap(); + + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(!states[0].is_expired()); + + assert!(states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(states[1].is_holder(¶ms[0].identity)); + assert!(!states[1].is_expired()); + + // Try to release by 2nd, successfully + states[1].release(¶ms[1], LeaseLockOpts::Force).await.unwrap(); + states[0].sync(LeaseLockOpts::Force).await.unwrap(); + + assert!(!states[0].is_locked()); + assert!(!states[0].is_holder(¶ms[0].identity)); + assert!(states[0].is_expired()); + + assert!(!states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(!states[1].is_holder(¶ms[0].identity)); + assert!(states[1].is_expired()); + + states[0].delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn soft_lock_1st_soft_lock_2nd() { + const LEASE_NAME: &str = "soft-lock-1st-soft-lock-2nd-test"; + let (params, mut states) = setup_simple_leaders_vec(LEASE_NAME, 2).await; + + // Lock by 1st + states[0].lock(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + states[1].sync(LeaseLockOpts::Force).await.unwrap(); + + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(!states[0].is_expired()); + + assert!(states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(states[1].is_holder(¶ms[0].identity)); + assert!(!states[1].is_expired()); + + // Try to lock by 2nd, unsuccessfully + states[1].lock(¶ms[1], LeaseLockOpts::Soft).await.unwrap(); + states[0].sync(LeaseLockOpts::Force).await.unwrap(); + + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(!states[0].is_expired()); + + assert!(states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(states[1].is_holder(¶ms[0].identity)); + assert!(!states[1].is_expired()); + + // Expire + sleep_secs(params[0].duration).await; + assert!(states[0].is_expired()); + assert!(states[1].is_expired()); + + // Try to lock by 2nd, unsuccessfully + states[1].lock(¶ms[1], LeaseLockOpts::Soft).await.unwrap(); + states[0].sync(LeaseLockOpts::Force).await.unwrap(); + + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(states[0].is_expired()); + + assert!(states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(states[1].is_holder(¶ms[0].identity)); + assert!(states[1].is_expired()); + + states[0].delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn unattended_soft_lock_1st_soft_lock_2nd() { + const LEASE_NAME: &str = "unattended-soft-lock-1st-soft-lock-2nd-test"; + let (params, mut states) = setup_simple_leaders_vec(LEASE_NAME, 2).await; + + // Lock by 1st and 2nd + states[0].lock(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + states[1].lock(¶ms[1], LeaseLockOpts::Soft).await.unwrap(); + + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(!states[0].is_expired()); + + assert!(states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(states[1].is_holder(¶ms[0].identity)); + assert!(!states[1].is_expired()); + + // Expire + sleep_secs(params[0].duration).await; + assert!(states[0].is_expired()); + assert!(states[1].is_expired()); + + // Try to lock by 2nd and 1st + states[0].lock(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + states[1].lock(¶ms[1], LeaseLockOpts::Soft).await.unwrap(); + + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(!states[0].is_expired()); + + assert!(states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(states[1].is_holder(¶ms[0].identity)); + assert!(!states[1].is_expired()); + + states[0].delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn unattended_soft_lock_1st_force_lock_2nd() { + const LEASE_NAME: &str = "unattended-soft-lock-1st-force-lock-2nd-test"; + let (params, mut states) = setup_simple_leaders_vec(LEASE_NAME, 2).await; + + // Lock by 1st and 2nd + states[0].lock(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + states[1].lock(¶ms[1], LeaseLockOpts::Soft).await.unwrap(); + + assert!(states[0].is_locked()); + assert!(states[0].is_holder(¶ms[0].identity)); + assert!(!states[0].is_expired()); + + assert!(states[1].is_locked()); + assert!(!states[1].is_holder(¶ms[1].identity)); + assert!(states[1].is_holder(¶ms[0].identity)); + assert!(!states[1].is_expired()); + + // Don't expire + sleep_secs(params[0].grace).await; + assert!(!states[0].is_expired()); + assert!(!states[1].is_expired()); + + // Try to lock by 2nd and 1st + states[0].lock(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + states[1].lock(¶ms[1], LeaseLockOpts::Force).await.unwrap(); + states[0].sync(LeaseLockOpts::Force).await.unwrap(); + + assert!(states[0].is_locked()); + assert!(!states[0].is_holder(¶ms[0].identity)); + assert!(!states[0].is_expired()); + + assert!(states[1].is_locked()); + assert!(states[1].is_holder(¶ms[1].identity)); + assert!(!states[1].is_holder(¶ms[0].identity)); + assert!(!states[1].is_expired()); + + states[0].delete().await.unwrap(); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn deleted_lease_state() { + const LEASE_NAME: &str = "deleted-lease-state-test"; + + let (params, mut states) = setup_simple_leaders_vec(LEASE_NAME, 1).await; + + states[0].lock(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + states[0].delete().await.unwrap(); + + let result = states[0].sync(LeaseLockOpts::Force).await; + assert!(result.is_err()); + assert!(matches!(result.err().unwrap(), LeaseStateError::NonexistentLease(_))); + assert!(states[0].holder.is_none()); + assert_eq!(states[0].transitions, 0); + assert!(states[0].is_expired()); + } + + #[tokio::test] + #[ignore = "uses k8s current-context"] + async fn update_lease_with_conflict() { + const LEASE_NAME: &str = "update-lease-with-conflict-test"; + + let (params, mut states) = setup_simple_leaders_vec(LEASE_NAME, 2).await; + + // Lock lease ordinary + states[0].lock(¶ms[0], LeaseLockOpts::Soft).await.unwrap(); + states[1].sync(LeaseLockOpts::Force).await.unwrap(); + + // if the lock is orphaned - try to lock it softly + let now: DateTime = SystemTime::now().into(); + let patch = serde_json::json!({ + "apiVersion": "coordination.k8s.io/v1", + "kind": "Lease", + "spec": { + "acquireTime": MicroTime(now), + "renewTime": MicroTime(now), + "holderIdentity": params[1].identity, + "leaseDurationSeconds": params[1].duration, + "leaseTransitions": states[1].transitions + 1, + }, + }); + + let patch = Patch::Apply(patch); + let result = states[1].patch(¶ms[1], &patch).await; + assert!(result.is_err()); + assert!(matches!(result, Err(LeaseStateError::LockConflict))); + + states[0].delete().await.unwrap(); + } +} diff --git a/tests/auto.rs b/tests/auto.rs new file mode 100644 index 0000000..d109060 --- /dev/null +++ b/tests/auto.rs @@ -0,0 +1,38 @@ +use kube::Client; +use kube_lease_manager::LeaseManagerBuilder; +use std::time::Duration; + +#[tokio::test] +#[ignore = "uses k8s current-context"] +async fn auto() { + tracing_subscriber::fmt::init(); + // Use default Kube client + let client = Client::try_default().await.unwrap(); + // Create the simplest LeaseManager with reasonable defaults using convenient builder. + // It uses Lease resource called `test-watch-lease`. + let manager = LeaseManagerBuilder::new(client, "test-auto-lease") + .build() + .await + .unwrap(); + + let (mut channel, task) = manager.watch().await; + // Watch on the channel for lock state changes + tokio::select! { + _ = channel.changed() => { + let lock_state = *channel.borrow_and_update(); + + if lock_state { + // Do something useful as a leader + println!("Got a luck!"); + } + } + _ = tokio::time::sleep(Duration::from_secs(10)) => { + println!("Unable get lock during 10s"); + } + } + + // Explicitly close the control channel + drop(channel); + // Wait for the finish of the manager and get it back + let _manager = tokio::join!(task).0.unwrap().unwrap(); +} diff --git a/tests/manual.rs b/tests/manual.rs new file mode 100644 index 0000000..f35e460 --- /dev/null +++ b/tests/manual.rs @@ -0,0 +1,46 @@ +use kube::Client; +use kube_lease_manager::{LeaseManagerBuilder, Result}; +use std::time::Duration; + +#[tokio::test] +#[ignore = "uses k8s current-context"] +async fn manual() -> Result<()> { + // Use default Kube client + let client = Client::try_default().await?; + // Create the simplest LeaseManager with reasonable defaults using convenient builder. + // It uses Lease resource called `test-watch-lease`. + let manager = LeaseManagerBuilder::new(client, "test-manual-lease").build().await?; + + // Try to get a lock on resource + let state = manager.changed().await?; + assert!(state); + + // Lets run two branches: + // - first one watches on state to ensure we don't work with lost lease and refreshes lock + // - second one does actual work + tokio::select! { + // Ensure `changed()` is running to refresh lease lock + lock_state = manager.changed() => { + if let Ok(state) = lock_state { + println!("Looks like lock state was changed to {state} before we finished."); + assert!(!state); + } else { + println!("Something wrong happened: {lock_state:?}.") + } + } + // Do everything you need with locked resource + _ = async { + println!("We got a lease lock! Lets do out heady work..."); + // Do something useful here + tokio::time::sleep(Duration::from_secs(1)).await + } => { + println!("We've done our heavy work."); + // Release lock after finish + manager.release().await?; + // And ensure state was changed + assert!(!manager.changed().await?); + } + } + + Ok(()) +}