Skip to content

Commit

Permalink
feat: impl acquire_owned for Semaphore (#22)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun authored Dec 18, 2024
1 parent 799cbf4 commit 443e798
Showing 1 changed file with 107 additions and 0 deletions.
107 changes: 107 additions & 0 deletions mea/src/semaphore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
//! [`acquire`]: Semaphore::acquire
//! [`release`]: Semaphore::release
use std::sync::Arc;

use crate::internal;

#[cfg(test)]
Expand Down Expand Up @@ -238,6 +240,49 @@ impl Semaphore {
self.s.acquire(permits).await;
SemaphorePermit { sem: self, permits }
}

/// Acquires `n` permits from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method.
///
/// If the permits are not immediately available, this method will wait until they become
/// available. Returns a [`SemaphorePermit`] that will release the permits when dropped.
///
/// # Cancel safety
///
/// This method uses a queue to fairly distribute permits in the order they were requested.
/// Cancelling a call to `acquire_owned` makes you lose your place in the queue.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use std::sync::Arc;
///
/// use mea::semaphore::Semaphore;
///
/// let sem = Arc::new(Semaphore::new(3));
/// let mut join_handles = Vec::new();
///
/// for _ in 0..5 {
/// let permit = sem.clone().acquire_owned(1).await;
/// join_handles.push(tokio::spawn(async move {
/// // perform task...
/// // explicitly own `permit` in the task
/// drop(permit);
/// }));
/// }
///
/// for handle in join_handles {
/// handle.await.unwrap();
/// }
/// # }
/// ```
pub async fn acquire_owned(self: Arc<Self>, permits: u32) -> OwnedSemaphorePermit {
self.s.acquire(permits).await;
OwnedSemaphorePermit { sem: self, permits }
}
}

/// A permit from the semaphore.
Expand Down Expand Up @@ -305,3 +350,65 @@ impl Drop for SemaphorePermit<'_> {
self.sem.release(self.permits);
}
}

/// An owned permit from the semaphore.
///
/// This type is created by the [`acquire_owned`] method.
///
/// [`acquire_owned`]: Semaphore::acquire_owned
#[must_use = "permits are released immediately when dropped"]
#[derive(Debug)]
pub struct OwnedSemaphorePermit {
sem: Arc<Semaphore>,
permits: u32,
}

impl OwnedSemaphorePermit {
/// Forgets the permit **without** releasing it back to the semaphore.
///
/// This can be used to permanently reduce the number of permits available
/// from a semaphore.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
///
/// use mea::semaphore::Semaphore;
///
/// let sem = Arc::new(Semaphore::new(10));
/// {
/// let permit = sem.try_acquire(5).unwrap();
/// assert_eq!(sem.available_permits(), 5);
/// permit.forget();
/// }
///
/// // Since we forgot the permit, available permits won't go back to
/// // its initial value even after the permit is dropped
/// assert_eq!(sem.available_permits(), 5);
/// ```
pub fn forget(mut self) {
self.permits = 0;
}

/// Returns the number of permits this permit holds.
///
/// # Examples
///
/// ```
/// use mea::semaphore::Semaphore;
///
/// let sem = Semaphore::new(5);
/// let permit = sem.try_acquire(3).unwrap();
/// assert_eq!(permit.permits(), 3);
/// ```
pub fn permits(&self) -> u32 {
self.permits
}
}

impl Drop for OwnedSemaphorePermit {
fn drop(&mut self) {
self.sem.release(self.permits);
}
}

0 comments on commit 443e798

Please sign in to comment.