Skip to content

Commit

Permalink
Move integrated timer queue into time-queue-driver
Browse files Browse the repository at this point in the history
  • Loading branch information
bugadani committed Dec 8, 2024
1 parent 4abc5a9 commit 9733969
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 94 deletions.
96 changes: 11 additions & 85 deletions embassy-executor/src/raw/timer_queue.rs
Original file line number Diff line number Diff line change
@@ -1,99 +1,25 @@
//! Timer queue operations.
use core::cmp::min;
use super::util::SyncUnsafeCell;
use core::cell::Cell;

use super::TaskRef;

/// An item in the timer queue.
pub struct TimerQueueItem {
next: SyncUnsafeCell<Option<TaskRef>>,
expires_at: SyncUnsafeCell<u64>,
}
/// The next item in the queue.
pub next: Cell<Option<TaskRef>>,

impl TimerQueueItem {
pub(crate) const fn new() -> Self {
Self {
next: SyncUnsafeCell::new(None),
expires_at: SyncUnsafeCell::new(0),
}
}
/// The time at which this item expires.
pub expires_at: Cell<u64>,
}

/// A timer queue, with items integrated into tasks.
pub struct TimerQueue {
head: SyncUnsafeCell<Option<TaskRef>>,
}
unsafe impl Sync for TimerQueueItem {}

impl TimerQueue {
/// Creates a new timer queue.
pub const fn new() -> Self {
impl TimerQueueItem {
pub(crate) const fn new() -> Self {
Self {
head: SyncUnsafeCell::new(None),
}
}

/// Schedules a task to run at a specific time.
///
/// If this function returns `true`, the called should find the next expiration time and set
/// a new alarm for that time.
pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool {
unsafe {
let item = p.timer_queue_item();
if item.next.get().is_none() {
// If not in the queue, add it and update.
let prev = self.head.replace(Some(p));
item.next.set(prev);
} else if at <= item.expires_at.get() {
// If expiration is sooner than previously set, update.
} else {
// Task does not need to be updated.
return false;
}

item.expires_at.set(at);
true
}
}

/// Dequeues expired timers and returns the next alarm time.
///
/// The provided callback will be called for each expired task. Tasks that never expire
/// will be removed, but the callback will not be called.
pub fn next_expiration(&mut self, now: u64) -> u64 {
let mut next_expiration = u64::MAX;

self.retain(|p| {
let item = p.timer_queue_item();
let expires = unsafe { item.expires_at.get() };

if expires <= now {
// Timer expired, process task.
super::wake_task(p);
false
} else {
// Timer didn't yet expire, or never expires.
next_expiration = min(next_expiration, expires);
expires != u64::MAX
}
});

next_expiration
}

fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
unsafe {
let mut prev = &self.head;
while let Some(p) = prev.get() {
let item = p.timer_queue_item();
if f(p) {
// Skip to next
prev = &item.next;
} else {
// Remove it
prev.set(item.next.get());
item.next.set(None);
}
}
next: Cell::new(None),
expires_at: Cell::new(0),
}
}
}
5 changes: 0 additions & 5 deletions embassy-executor/src/raw/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,4 @@ impl<T> SyncUnsafeCell<T> {
{
*self.value.get()
}

#[cfg(feature = "integrated-timers")]
pub unsafe fn replace(&self, value: T) -> T {
core::mem::replace(&mut *self.value.get(), value)
}
}
11 changes: 7 additions & 4 deletions embassy-time-queue-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
//! );
//! ```
//!
//! You can also use the `queue_generic` or the `embassy_executor::raw::timer_queue` modules to
//! implement your own timer queue. These modules contain queue implementations which you can wrap
//! and tailor to your needs.
//! You can also use the `queue_generic` or the `queue_integrated` modules to implement your own
//! timer queue. These modules contain queue implementations which you can wrap and tailor to
//! your needs.
//!
//! If you are providing an embassy-executor implementation besides a timer queue, you can choose to
//! expose the `integrated-timers` feature in your implementation. This feature stores timer items
Expand All @@ -49,7 +49,10 @@
//! embassy_time_queue_driver::timer_queue_impl!(static QUEUE: MyTimerQueue = MyTimerQueue{});
//! ```
#[cfg(not(feature = "integrated-timers"))]
pub mod queue_generic;
#[cfg(feature = "integrated-timers")]
pub mod queue_integrated;

use core::cell::RefCell;
use core::task::Waker;
Expand Down Expand Up @@ -89,7 +92,7 @@ macro_rules! timer_queue_impl {
}

#[cfg(feature = "integrated-timers")]
type InnerQueue = embassy_executor::raw::timer_queue::TimerQueue;
type InnerQueue = queue_integrated::TimerQueue;

#[cfg(not(feature = "integrated-timers"))]
type InnerQueue = queue_generic::Queue;
Expand Down
78 changes: 78 additions & 0 deletions embassy-time-queue-driver/src/queue_integrated.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! Timer queue operations.
use core::cell::Cell;
use core::cmp::min;

use embassy_executor::raw::TaskRef;

/// A timer queue, with items integrated into tasks.
pub struct TimerQueue {
head: Cell<Option<TaskRef>>,
}

impl TimerQueue {
/// Creates a new timer queue.
pub const fn new() -> Self {
Self { head: Cell::new(None) }
}

/// Schedules a task to run at a specific time.
///
/// If this function returns `true`, the called should find the next expiration time and set
/// a new alarm for that time.
pub fn schedule_wake(&mut self, at: u64, p: TaskRef) -> bool {
let item = p.timer_queue_item();
if item.next.get().is_none() {
// If not in the queue, add it and update.
let prev = self.head.replace(Some(p));
item.next.set(prev);
} else if at <= item.expires_at.get() {
// If expiration is sooner than previously set, update.
} else {
// Task does not need to be updated.
return false;
}

item.expires_at.set(at);
true
}

/// Dequeues expired timers and returns the next alarm time.
///
/// The provided callback will be called for each expired task. Tasks that never expire
/// will be removed, but the callback will not be called.
pub fn next_expiration(&mut self, now: u64) -> u64 {
let mut next_expiration = u64::MAX;

self.retain(|p| {
let item = p.timer_queue_item();
let expires = item.expires_at.get();

if expires <= now {
// Timer expired, process task.
embassy_executor::raw::wake_task(p);
false
} else {
// Timer didn't yet expire, or never expires.
next_expiration = min(next_expiration, expires);
expires != u64::MAX
}
});

next_expiration
}

fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
let mut prev = &self.head;
while let Some(p) = prev.get() {
let item = p.timer_queue_item();
if f(p) {
// Skip to next
prev = &item.next;
} else {
// Remove it
prev.set(item.next.get());
item.next.set(None);
}
}
}
}

0 comments on commit 9733969

Please sign in to comment.