Skip to content

Commit

Permalink
Document task states and state transitions
Browse files Browse the repository at this point in the history
  • Loading branch information
bugadani committed Dec 9, 2024
1 parent 86578ac commit f8732f8
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,62 @@ pub use self::waker::task_from_waker;
use super::SpawnToken;

/// Raw task header for use in task pointers.
///
/// A task can be in one of the following states:
///
/// - Not spawned: the task is ready to spawn.
/// - `SPAWNED`: the task is currently spawned and may be running.
/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
/// In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
/// polling the task's future.
/// - `TIMER_ENQUEUED`: the task is currently enqueued in the timer queue. When its expiration is
/// due, the task will be enqueued in the run queue.
///
/// A task's complete life cycle is as follows:
///
/// ```text
/// ┌────────────────────────────────────────────────────────┐
/// 10 ┌──────────────────────────────────────────────────11┐ │
/// ┌───┴─▼──────┐ ┌────────────────────────┐ ┌────────────┴─▼───────────┐
/// ┌─►│Not spawned │◄13┤Not spawned|Run enqueued│ │Not spawned|Timer enqueued│
/// │ │ │ │ │ │ │
/// │ └─────┬──────┘ └──────▲────┬───▲────────┘ └──────────────────────────┘
/// │ 1 │ 14 │
/// │ │ │ │ │
/// │ │ │ │ │ ┌───────────────────────────────────────┐
/// │ │ │ │ └15┤Not spawned|Run enqueued|Timer enqueued│
/// │ │ ┌────────────┘ └─────►│ │
/// │ │ │ └───────────────────────────────────────┘
/// │ │ │
/// │ │ 12
/// │ ┌─────▼────┴─────────┐ ┌───────────────────────────────────┐
/// │ │Spawned|Run enqueued├7────────────────►│Spawned|Run enqueued|Timer enqueued│
/// │ │ │◄────────────────8┤ │
/// │ └─────┬▲───▲─────────┘ └───────────────────▲───────────────┘
/// │ 2│ └─────────────────────────────────────┐ │
/// │ │3 6 9
/// │ ┌─────▼┴─────┐ ┌┴──────────┴──────────┐
/// └─4┤ Spawned ├5────────────────────────────────►│Spawned|Timer enqueued│
/// │ │ │ │
/// └────────────┘ └──────────────────────┘
/// ```
///
/// Transitions:
/// - 1: Task is claimed for spawning - `AvailableTask::claim -> Executor::spawn`
/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
/// - 3: Task wakes itself, waker wakes task - `Waker::wake -> wake_task -> State::run_enqueue`
/// - 4: Task exits - `TaskStorage::poll -> Poll::Ready`
/// - 5: Task schedules itself - `TimerQueue::schedule_wake -> TimerQueue::update`
/// - 6: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend -> State::run_enqueue`
/// - 7: `schedule_wake -> State::update` enqueues the task in the timer queue.
/// - 8: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend` -> task is already RUN_ENQUEUED.
/// - 9: A waker wakes a task that is in the timer queue. `Waker::wake -> wake_task -> State::run_enqueue`
/// - 10: A race condition happens: A task exits, then a different thread calls its `schedule_wake`, then the task calls its `TimerQueue::update`
/// - 11: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend -> State::run_enqueue`
/// - 12: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
/// - 13: During poll - `State::run_dequeue`, task is then ignored.
/// - 14: A race condition happens between the task clearing its `expires_at` value and another thread calling `schedule_wake`.
/// - 15: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend` -> task is already RUN_ENQUEUED.
pub(crate) struct TaskHeader {
pub(crate) state: State,
pub(crate) run_queue_item: RunQueueItem,
Expand Down Expand Up @@ -162,6 +218,7 @@ impl<F: Future + 'static> TaskStorage<F> {
this.raw.state.despawn();

#[cfg(feature = "integrated-timers")]
// FIXME: There is a data race between `TimerQueue::schedule_wake` and this line.
this.raw.expires_at.set(u64::MAX);
}
Poll::Pending => {}
Expand Down Expand Up @@ -389,6 +446,7 @@ impl SyncExecutor {
let task = p.header();

#[cfg(feature = "integrated-timers")]
// FIXME: There is a data race between `TimerQueue::schedule_wake` and this line.
task.expires_at.set(u64::MAX);

if !task.state.run_dequeue() {
Expand Down Expand Up @@ -555,6 +613,9 @@ pub fn wake_task(task: TaskRef) {
let header = task.header();
if header.state.run_enqueue() {
// We have just marked the task as scheduled, so enqueue it.
// FIXME: there is currently a data race between re-spawning a task and waking it using an
// old waker. If the task is being spawned on a different executor, then reading and writing
// the executor field may happen concurrently.
unsafe {
let executor = header.executor.get().unwrap_unchecked();
executor.enqueue(task);
Expand All @@ -569,6 +630,9 @@ pub fn wake_task_no_pend(task: TaskRef) {
let header = task.header();
if header.state.run_enqueue() {
// We have just marked the task as scheduled, so enqueue it.
// FIXME: there is currently a data race between re-spawning a task and waking it using an
// old waker. If the task is being spawned on a different executor, then reading and writing
// the executor field may happen concurrently.
unsafe {
let executor = header.executor.get().unwrap_unchecked();
executor.run_queue.enqueue(task);
Expand Down

0 comments on commit f8732f8

Please sign in to comment.