Skip to content

Commit

Permalink
feat(threads): dynamic thread priorities (#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
elenaf9 authored Sep 30, 2024
2 parents 93e6d08 + 5e240e6 commit d5bfd9d
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 12 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ members = [
"tests/gpio-interrupt-nrf",
"tests/gpio-interrupt-stm32",
"tests/i2c-controller",
"tests/threading-dynamic-prios",
"tests/threading-lock",
]

Expand Down
39 changes: 33 additions & 6 deletions src/riot-rs-runqueue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ mod tests {
runqueue.add(ThreadId::new(4), RunqueueId::new(1));

assert_eq!(runqueue.get_next(), Some(ThreadId::new(2)));
runqueue.del(ThreadId::new(2), RunqueueId::new(1));
runqueue.pop_head(ThreadId::new(2), RunqueueId::new(1));
assert_eq!(runqueue.get_next(), Some(ThreadId::new(4)));
runqueue.del(ThreadId::new(4), RunqueueId::new(1));
runqueue.pop_head(ThreadId::new(4), RunqueueId::new(1));
assert_eq!(runqueue.get_next(), Some(ThreadId::new(0)));
runqueue.del(ThreadId::new(0), RunqueueId::new(0));
runqueue.pop_head(ThreadId::new(0), RunqueueId::new(0));
assert_eq!(runqueue.get_next(), Some(ThreadId::new(1)));
runqueue.del(ThreadId::new(1), RunqueueId::new(0));
runqueue.pop_head(ThreadId::new(1), RunqueueId::new(0));
assert_eq!(runqueue.get_next(), Some(ThreadId::new(3)));
runqueue.del(ThreadId::new(3), RunqueueId::new(0));
runqueue.pop_head(ThreadId::new(3), RunqueueId::new(0));
assert_eq!(runqueue.get_next(), None);
}
#[test]
Expand All @@ -85,7 +85,7 @@ mod tests {
runqueue.add(ThreadId::new(1), RunqueueId::new(0));

assert_eq!(runqueue.get_next(), Some(ThreadId::new(0)));
runqueue.del(ThreadId::new(0), RunqueueId::new(0));
runqueue.pop_head(ThreadId::new(0), RunqueueId::new(0));
assert_eq!(runqueue.get_next(), Some(ThreadId::new(1)));

runqueue.add(ThreadId::new(0), RunqueueId::new(0));
Expand All @@ -95,4 +95,31 @@ mod tests {
runqueue.advance(RunqueueId::new(0));
assert_eq!(runqueue.get_next(), Some(ThreadId::new(0)));
}

#[test]
fn test_rq_del() {
let mut runqueue: RunQueue<8, 32> = RunQueue::new();

runqueue.add(ThreadId::new(0), RunqueueId::new(1));
runqueue.add(ThreadId::new(1), RunqueueId::new(1));
runqueue.add(ThreadId::new(2), RunqueueId::new(0));

assert_eq!(runqueue.get_next(), Some(ThreadId::new(0)));

// Delete thread that isn't head.
runqueue.del(ThreadId::new(1));
assert_eq!(runqueue.get_next(), Some(ThreadId::new(0)));

// Delete head.
runqueue.del(ThreadId::new(0));
assert_eq!(runqueue.get_next(), Some(ThreadId::new(2)));

// Delete invalid thread.
runqueue.del(ThreadId::new(3));
assert_eq!(runqueue.get_next(), Some(ThreadId::new(2)));

// Delete last thread in runqueue.
runqueue.del(ThreadId::new(2));
assert_eq!(runqueue.get_next(), None);
}
}
54 changes: 52 additions & 2 deletions src/riot-rs-runqueue/src/runqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,18 @@ impl<const N_QUEUES: usize, const N_THREADS: usize> RunQueue<{ N_QUEUES }, { N_T
self.queues.push(n.0, rq.0);
}

/// Returns the head of the runqueue without removing it.
pub fn peek_head(&self, rq: RunqueueId) -> Option<ThreadId> {
self.queues.peek_head(rq.0).map(ThreadId::new)
}

/// Removes thread with pid `n` from runqueue number `rq`.
///
/// # Panics
///
/// Panics if `n` is not the queue's head.
/// This is fine, RIOT-rs only ever calls `del()` for the current thread.
pub fn del(&mut self, n: ThreadId, rq: RunqueueId) {
/// This is fine, RIOT-rs only ever calls `pop_head()` for the current thread.
pub fn pop_head(&mut self, n: ThreadId, rq: RunqueueId) {
debug_assert!(usize::from(n) < N_THREADS);
debug_assert!(usize::from(rq) < N_QUEUES);
let popped = self.queues.pop_head(rq.0);
Expand All @@ -96,6 +101,13 @@ impl<const N_QUEUES: usize, const N_THREADS: usize> RunQueue<{ N_QUEUES }, { N_T
}
}

/// Removes thread with pid `n`.
pub fn del(&mut self, n: ThreadId) {
if let Some(empty_runqueue) = self.queues.del(n.0) {
self.bitcache &= !(1 << empty_runqueue);
}
}

fn ffs(val: usize) -> u32 {
USIZE_BITS as u32 - val.leading_zeros()
}
Expand Down Expand Up @@ -179,6 +191,31 @@ mod clist {
}
}

/// Removes a thread from the list.
///
/// If the thread was the only thread in its runqueue, `Some` is returned
/// with the ID of the now empty runqueue.
pub fn del(&mut self, n: u8) -> Option<u8> {
let mut empty_runqueue = None;

// Find previous thread in circular runqueue.
let prev = position(&self.next_idxs, n)?;

// Handle if thread is tail of a runqueue.
if let Some(rq) = position(&self.tail, n) {
if prev == n as usize {
// Runqueue is empty now.
self.tail[rq] = Self::sentinel();
empty_runqueue = Some(rq as u8);
} else {
self.tail[rq] = prev as u8;
}
}
self.next_idxs[prev] = self.next_idxs[n as usize];
self.next_idxs[n as usize] = Self::sentinel();
empty_runqueue
}

pub fn pop_head(&mut self, rq: u8) -> Option<u8> {
let head = self.peek_head(rq)?;

Expand Down Expand Up @@ -214,6 +251,19 @@ mod clist {
}
}

/// Helper function that is needed because hax doesn't support `Iterator::position` yet.
fn position<const N: usize>(slice: &[u8; N], search_item: u8) -> Option<usize> {
let mut i = 0;
while i < N && slice[i] != search_item {
i += 1;
}
if i < N {
Some(i)
} else {
None
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
66 changes: 62 additions & 4 deletions src/riot-rs-threads/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,16 @@ impl Threads {
}
}

// fn get_unchecked(&self, thread_id: ThreadId) -> &Thread {
// &self.threads[thread_id as usize]
// }
/// Returns access to any thread data.
///
/// # Panics
///
/// Panics if `thread_id` is >= [`THREADS_NUMOF`].
/// If the thread for this `thread_id` is in an invalid state, the
/// data in the returned [`Thread`] is undefined, i.e. empty or outdated.
fn get_unchecked(&self, thread_id: ThreadId) -> &Thread {
&self.threads[usize::from(thread_id)]
}

/// Returns mutable access to any thread data.
///
Expand Down Expand Up @@ -158,7 +165,7 @@ impl Threads {
if old_state != ThreadState::Running && state == ThreadState::Running {
self.runqueue.add(thread.pid, thread.prio);
} else if old_state == ThreadState::Running && state != ThreadState::Running {
self.runqueue.del(thread.pid, thread.prio);
self.runqueue.pop_head(thread.pid, thread.prio);
}

old_state
Expand All @@ -172,6 +179,39 @@ impl Threads {
None
}
}

fn get_priority(&self, thread_id: ThreadId) -> Option<RunqueueId> {
self.is_valid_pid(thread_id)
.then(|| self.get_unchecked(thread_id).prio)
}

/// Changes the priority of a thread.
///
/// Returns the information if the scheduler should be invoked because the runqueue order
/// might have changed.
/// `false` if the thread isn't in the runqueue (in which case the priority is still changed)
/// or if the new priority equals the current one.
fn set_priority(&mut self, thread_id: ThreadId, prio: RunqueueId) -> bool {
if !self.is_valid_pid(thread_id) {
return false;
}
let thread = self.get_unchecked_mut(thread_id);
let old_prio = thread.prio;
if old_prio == prio {
return false;
}
thread.prio = prio;
if thread.state != ThreadState::Running {
return false;
}
if self.runqueue.peek_head(old_prio) == Some(thread_id) {
self.runqueue.pop_head(thread_id, old_prio);
} else {
self.runqueue.del(thread_id);
}
self.runqueue.add(thread_id, prio);
true
}
}

/// Starts threading.
Expand Down Expand Up @@ -339,6 +379,24 @@ pub fn wakeup(thread_id: ThreadId) -> bool {
})
}

/// Returns the priority of a thread.
///
/// Returns `None` if this is not a valid thread.
pub fn get_priority(thread_id: ThreadId) -> Option<RunqueueId> {
THREADS.with_mut(|threads| threads.get_priority(thread_id))
}

/// Changes the priority of a thread.
///
/// This might trigger a context switch.
pub fn set_priority(thread_id: ThreadId, prio: RunqueueId) {
THREADS.with_mut(|mut threads| {
if threads.set_priority(thread_id, prio) {
schedule();
}
})
}

/// Returns the size of the internal structure that holds the
/// a thread's data.
pub fn thread_struct_size() -> usize {
Expand Down
1 change: 1 addition & 0 deletions tests/laze.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ subdirs:
- gpio-interrupt-nrf
- gpio-interrupt-stm32
- i2c-controller
- threading-dynamic-prios
- threading-lock
12 changes: 12 additions & 0 deletions tests/threading-dynamic-prios/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "threading-dynamic-prios"
version = "0.1.0"
authors = ["Elena Frank <elena.frank@proton.me>"]
edition.workspace = true
license.workspace = true
publish = false

[dependencies]
riot-rs = { path = "../../src/riot-rs", features = ["threading"] }
riot-rs-boards = { path = "../../src/riot-rs-boards" }
portable-atomic = { workspace = true }
5 changes: 5 additions & 0 deletions tests/threading-dynamic-prios/laze.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apps:
- name: threading-dynamic-prios
selects:
- ?release
- sw/threading
52 changes: 52 additions & 0 deletions tests/threading-dynamic-prios/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#![no_main]
#![no_std]
#![feature(type_alias_impl_trait)]
#![feature(used_with_arg)]

use portable_atomic::{AtomicUsize, Ordering};

use riot_rs::thread::{RunqueueId, ThreadId};

static RUN_ORDER: AtomicUsize = AtomicUsize::new(0);

static TEMP_THREAD1_PRIO: RunqueueId = RunqueueId::new(5);

#[riot_rs::thread(autostart, priority = 2)]
fn thread0() {
let pid = riot_rs::thread::current_pid().unwrap();
assert_eq!(riot_rs::thread::get_priority(pid), Some(RunqueueId::new(2)));

assert_eq!(RUN_ORDER.fetch_add(1, Ordering::AcqRel), 0);

let thread1_pid = ThreadId::new(1);
assert_eq!(
riot_rs::thread::get_priority(thread1_pid),
Some(RunqueueId::new(1))
);
riot_rs::thread::set_priority(thread1_pid, TEMP_THREAD1_PRIO);

// thread1 runs now.

assert_eq!(RUN_ORDER.fetch_add(1, Ordering::AcqRel), 2);
riot_rs::debug::log::info!("Test passed!");
loop {}
}

#[riot_rs::thread(autostart, priority = 1)]
fn thread1() {
// Thread can only run after thread0 increased its prio.
assert_eq!(RUN_ORDER.fetch_add(1, Ordering::AcqRel), 1);
// Prio is the temp increased prio.
let pid = riot_rs::thread::current_pid().unwrap();
assert_eq!(riot_rs::thread::get_priority(pid), Some(TEMP_THREAD1_PRIO));
// Other thread prios didn't change.
assert_eq!(
riot_rs::thread::get_priority(ThreadId::new(0)),
Some(RunqueueId::new(2))
);

// Reset priority.
riot_rs::thread::set_priority(pid, RunqueueId::new(1));

unreachable!("Core should be blocked by higher prio thread.")
}

0 comments on commit d5bfd9d

Please sign in to comment.