Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl owned lock functions for Mutex and RwLock #26

Merged
merged 6 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ types:
- chore
- revert

targetUrl: https://github.com/tisonkun/mea/blob/main/.github/semantic.yml
targetUrl: https://github.com/cratesland/mea/blob/main/.github/semantic.yml
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ resolver = "2"

[workspace.package]
edition = "2021"
homepage = "https://github.com/tisonkun/mea"
homepage = "https://github.com/cratesland/mea"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/tisonkun/mea"
repository = "https://github.com/cratesland/mea"
rust-version = "1.80.0"

[workspace.lints.rust]
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
[msrv-badge]: https://img.shields.io/badge/MSRV-1.80-green?logo=rust
[license-badge]: https://img.shields.io/crates/l/mea
[license-url]: LICENSE
[actions-badge]: https://github.com/tisonkun/mea/actions/workflows/ci.yml/badge.svg
[actions-url]: https://github.com/tisonkun/mea/actions/workflows/ci.yml
[actions-badge]: https://github.com/cratesland/mea/actions/workflows/ci.yml/badge.svg
[actions-url]: https://github.com/cratesland/mea/actions/workflows/ci.yml

## Overview

Expand Down
177 changes: 78 additions & 99 deletions mea/src/condvar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,12 @@ use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;

use slab::Slab;

use crate::internal::Mutex;
use crate::internal::WakerSet;
use crate::mutex;
use crate::mutex::MutexGuard;
use crate::mutex::OwnedMutexGuard;

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -92,10 +91,7 @@ impl Condvar {
/// ```
pub fn new() -> Condvar {
Condvar {
wakers: Mutex::new(WakerSet {
entries: Slab::new(),
notifiable: 0,
}),
wakers: Mutex::new(WakerSet::new()),
}
}

Expand All @@ -111,7 +107,7 @@ impl Condvar {
wakers.notify_all();
}

/// Blocks the current task until this condition variable receives a notification.
/// Yields the current task until this condition variable receives a notification.
///
/// Unlike the std equivalent, this does not check that a single mutex is used at runtime.
/// However, as a best practice avoid using with multiple mutexes.
Expand All @@ -128,7 +124,24 @@ impl Condvar {
mutex.lock().await
}

/// Blocks the current task until this condition variable receives a notification and the
/// Yields the current task until this condition variable receives a notification.
///
/// Unlike the std equivalent, this does not check that a single mutex is used at runtime.
/// However, as a best practice avoid using with multiple mutexes.
pub async fn wait_owned<T>(&self, guard: OwnedMutexGuard<T>) -> OwnedMutexGuard<T> {
let mutex = mutex::owned_guard_lock(&guard);

let fut = AwaitNotify {
cond: self,
guard: Some(guard),
key: None,
};
fut.await;

mutex.lock_owned().await
}

/// Yields the current task until this condition variable receives a notification and the
/// provided condition becomes false. Spurious wake-ups are ignored and this function will only
/// return once the condition has been met.
///
Expand All @@ -154,9 +167,10 @@ impl Condvar {
/// // Wait for the thread to start up.
/// let (lock, cvar) = &*pair;
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
/// let _guard = cvar
/// let guard = cvar
/// .wait_while(lock.lock().await, |started| !*started)
/// .await;
/// assert!(*guard);
/// # }
/// ```
pub async fn wait_while<'a, T, F>(
Expand All @@ -172,29 +186,78 @@ impl Condvar {
}
guard
}

/// Yields the current task until this condition variable receives a notification and the
/// provided condition becomes false. Spurious wake-ups are ignored and this function will only
/// return once the condition has been met.
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use std::sync::Arc;
///
/// use mea::condvar::Condvar;
/// use mea::mutex::Mutex;
///
/// let pair = (Arc::new(Mutex::new(false)), Arc::new(Condvar::new()));
/// let pair_clone = pair.clone();
///
/// tokio::spawn(async move {
/// let (lock, cvar) = pair_clone;
/// let mut started = lock.lock_owned().await;
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let (lock, cvar) = pair;
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
/// let guard = cvar
/// .wait_while_owned(lock.lock_owned().await, |started| !*started)
/// .await;
/// assert!(*guard);
/// # }
/// ```
pub async fn wait_while_owned<T, F>(
&self,
mut guard: OwnedMutexGuard<T>,
mut condition: F,
) -> OwnedMutexGuard<T>
where
F: FnMut(&mut T) -> bool,
{
while condition(&mut *guard) {
guard = self.wait_owned(guard).await;
}
guard
}
}

/// A future that waits for another task to notify the condition variable.
struct AwaitNotify<'a, 'b, T> {
struct AwaitNotify<'a, G> {
/// The condition variable that we are waiting on.
cond: &'a Condvar,
/// The lock used with `cond`.
/// This will be released the first time the future is polled,
/// after registering the context to be notified.
guard: Option<MutexGuard<'b, T>>,
guard: Option<G>,
/// A key into the conditions variable's [`WakerSet`].
/// This is set to the index of the `Waker` for the context each time
/// the future is polled and not completed.
key: Option<usize>,
}

impl<T> Future for AwaitNotify<'_, '_, T> {
impl<G> Future for AwaitNotify<'_, G>
where
G: Unpin,
{
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut wakers = self.cond.wakers.lock();
match self.guard.take() {
Some(_) => {
Some(..) => {
self.key = Some(wakers.insert(cx));
// the guard is dropped when we return, which frees the lock
Poll::Pending
Expand All @@ -216,95 +279,11 @@ impl<T> Future for AwaitNotify<'_, '_, T> {
}
}

impl<T> Drop for AwaitNotify<'_, '_, T> {
impl<G> Drop for AwaitNotify<'_, G> {
fn drop(&mut self) {
let mut wakers = self.cond.wakers.lock();
if let Some(key) = self.key {
wakers.cancel(key);
}
}
}

struct WakerSet {
entries: Slab<Option<Waker>>,
notifiable: usize,
}

impl WakerSet {
/// Inserts a waker for a blocked operation and returns a key associated with it.
fn insert(&mut self, cx: &Context<'_>) -> usize {
let key = self.entries.insert(Some(cx.waker().clone()));
self.notifiable += 1;
key
}

/// If the waker for this key is still waiting for a notification, then update
/// the waker for the entry, and return false. If the waker has been notified,
/// treat the entry as completed and return true.
fn remove_if_notified(&mut self, key: usize, cx: &Context<'_>) -> bool {
match &mut self.entries[key] {
None => {
self.entries.remove(key);
true
}
Some(w) => {
// We were never woken, so update instead
if !w.will_wake(cx.waker()) {
*w = cx.waker().clone();
}
false
}
}
}

/// Notifies all blocked operations.
///
/// Returns `true` if at least one operation was notified.
fn notify_all(&mut self) -> bool {
if self.notifiable > 0 {
let mut notified = false;
for (_, opt_waker) in self.entries.iter_mut() {
if let Some(w) = opt_waker.take() {
w.wake();
self.notifiable -= 1;
notified = true;
}
}
assert_eq!(self.notifiable, 0);
notified
} else {
false
}
}

/// Notifies one additional blocked operation.
///
/// Returns `true` if an operation was notified.
fn notify_one(&mut self) -> bool {
if self.notifiable > 0 {
for (_, opt_waker) in self.entries.iter_mut() {
if let Some(w) = opt_waker.take() {
w.wake();
self.notifiable -= 1;
return true;
}
}
}
false
}

/// Removes the waker of a cancelled operation.
///
/// Returns `true` if another blocked operation from the set was notified.
fn cancel(&mut self, key: usize) -> bool {
match self.entries.remove(key) {
Some(_) => {
self.notifiable -= 1;
false
}
// The operation was cancelled and notified so notify another operation instead.
// If there is no waker in this entry, that means it was already woken.
None => self.notify_one(),
}
}
}
3 changes: 3 additions & 0 deletions mea/src/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ pub(crate) use waitlist::*;

mod waitset;
pub(crate) use waitset::*;

mod wakerset;
pub(crate) use wakerset::*;
Loading