From a6592686a0c5af932c3b66f2b269130e8f0f45f6 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Wed, 10 Apr 2024 22:22:24 +0200 Subject: [PATCH 01/11] runqueue: allocate threads to >= 1 cores Support multiple cores in the runqueue. The current allocation for each core is stored in a new array. Thus, any call to `get_next_for_core` is minimal effort. The allocation is updated after each change in the runqueue. --- src/riot-rs-runqueue/src/runqueue.rs | 202 +++++++++++++++++++++++---- 1 file changed, 171 insertions(+), 31 deletions(-) diff --git a/src/riot-rs-runqueue/src/runqueue.rs b/src/riot-rs-runqueue/src/runqueue.rs index 4f97245dc..145e27c98 100644 --- a/src/riot-rs-runqueue/src/runqueue.rs +++ b/src/riot-rs-runqueue/src/runqueue.rs @@ -7,6 +7,7 @@ const USIZE_BITS: usize = mem::size_of::() * 8; /// Runqueue number. pub type RunqueueId = u8; pub type ThreadId = u8; +pub type CoreId = u8; /// Runqueue for `N_QUEUES`, supporting `N_THREADS` total. /// @@ -19,79 +20,175 @@ pub type ThreadId = u8; /// special value) /// /// The current implementation needs an usize for the bit cache, -/// an `[u8; N_QUEUES]` array for the list tail indexes -/// and an `[u8; N_THREADS]` for the list next indexes. -pub struct RunQueue { +/// an `[RunqueueId; N_QUEUES]` array for the list tail indexes +/// and an `[ThreadId; N_THREADS]` for the list next indexes. +pub struct RunQueue { /// Bitcache that represents the currently used queues /// in `0..N_QUEUES`. bitcache: usize, queues: clist::CList, + next: [Option; N_CORES], } -impl RunQueue<{ N_QUEUES }, { N_THREADS }> { +impl + RunQueue +{ // NOTE: we don't impl Default here because hax does not support it yet. When it does, we // should impl it. #[allow(clippy::new_without_default)] - pub const fn new() -> RunQueue<{ N_QUEUES }, { N_THREADS }> { + pub const fn new() -> RunQueue { // unfortunately we cannot assert!() on N_QUEUES and N_THREADS, // as panics in const fn's are not (yet) implemented. RunQueue { bitcache: 0, queues: CList::new(), + next: [None; N_CORES], } } /// Adds thread with pid `n` to runqueue number `rq`. - pub fn add(&mut self, n: ThreadId, rq: RunqueueId) { + /// + /// Returns a [`CoreId`] if the allocation for this core changed. + /// + pub fn add(&mut self, n: ThreadId, rq: RunqueueId) -> Option { debug_assert!((n as usize) < N_THREADS); debug_assert!((rq as usize) < N_QUEUES); self.bitcache |= 1 << rq; self.queues.push(n, rq); + self.reallocate() } /// Removes thread with pid `n` from runqueue number `rq`. /// + /// Returns a [`CoreId`] if the allocation for this core changed. + /// /// # Panics /// /// Panics if `n` is not the queue's head. /// This is fine, RIOT-rs only ever calls `del()` for the current thread. - pub fn del(&mut self, n: ThreadId, rq: RunqueueId) { + pub fn del(&mut self, n: ThreadId, rq: RunqueueId) -> Option { debug_assert!((n as usize) < N_THREADS); debug_assert!((rq as usize) < N_QUEUES); - let popped = self.queues.pop_head(rq); - // - assert_eq!(popped, Some(n)); + + if self.queues.peek_head(rq) == Some(n) { + let popped = self.queues.pop_head(rq); + assert_eq!(popped, Some(n)); + } else { + self.queues.del(n, rq); + } + if self.queues.is_empty(rq) { self.bitcache &= !(1 << rq); } + self.reallocate() } fn ffs(val: usize) -> u32 { USIZE_BITS as u32 - val.leading_zeros() } - /// Returns the pid that should run next. - /// - /// Returns the next runnable thread of - /// the runqueue with the highest index. - // - // TODO: Return `ThreadId` instead of u8? - pub fn get_next(&self) -> Option { - let rq_ffs = Self::ffs(self.bitcache); - if rq_ffs > 0 { - let rq = (rq_ffs - 1) as RunqueueId; - self.queues.peek_head(rq) - } else { - None + /// Returns the next thread that should run on this core. + pub fn get_next_for_core(&self, core: CoreId) -> Option { + if core as usize >= N_CORES { + return None; } + self.next[core as usize] } /// Advances runqueue number `rq`. /// /// This is used to "yield" to another thread of *the same* priority. - pub fn advance(&mut self, rq: RunqueueId) { + /// + /// Returns a [`CoreId`] if the allocation for this core changed. + pub fn advance(&mut self, rq: RunqueueId) -> Option { debug_assert!((rq as usize) < N_QUEUES); - self.queues.advance(rq) + self.queues.advance(rq); + self.reallocate() + } + + /// Update `self.next` so that the highest `N_CORES` threads + /// are allocated. + /// + /// This only changes allocations if a thread was previously allocated + /// and is now not part of the new list anymore, or the other way around. + /// It assumes that there was maximum one change in the runqueue since the + /// last reallocation (only one add/ delete or a runqueue advancement)! + /// + /// Returns a [`CoreId`] if the allocation for this core changed. + /// + /// The complexity of this call is O(n). + fn reallocate(&mut self) -> Option { + let next = self.get_next_n(); + let mut bitmap_next = 0; + let mut bitmap_allocateed = 0; + for i in 0..N_CORES { + if let Some(id) = next[i] { + bitmap_next |= 1 << id + } + if let Some(id) = self.next[i] { + bitmap_allocateed |= 1 << id + } + } + if bitmap_next == bitmap_allocateed { + return None; + } + let diff = bitmap_next ^ bitmap_allocateed; + let prev_allocateed = bitmap_allocateed & diff; + let new_allocateed = bitmap_next & diff; + + let changed_core = self + .next + .iter() + .position(|i| match prev_allocateed { + 0 => i.is_none(), + id => *i == Some(Self::ffs(id) as ThreadId - 1), + }) + .unwrap(); + let new_allocation = match new_allocateed { + 0 => None, + id => Some(Self::ffs(id) as CoreId - 1), + }; + self.next[changed_core] = new_allocation; + return Some(changed_core as CoreId); + } + + /// Returns the `n` highest priority threads in the [`Runqueue`]. + /// + /// Complexity is O(n). + pub fn get_next_n(&self) -> [Option; N_CORES] { + let mut next_list = [None; N_CORES]; + let mut bitcache = self.bitcache; + let mut head = 0; + let mut next = 0; + for i in 0..N_CORES { + if next == head { + let rq_ffs = Self::ffs(bitcache); + if rq_ffs == 0 { + break; + } + let rq = (rq_ffs - 1) as RunqueueId; + // Clear bit from bitcache. + bitcache &= !(1 << rq); + head = match self.queues.peek_head(rq) { + Some(t) => t, + None => return next_list, + }; + next = head; + } + next_list[i] = Some(next); + next = self.queues.peek_next(next); + } + next_list + } +} + +impl RunQueue { + /// Returns the pid that should run next. + /// + /// Returns the next runnable thread of + /// the runqueue with the highest index. + pub fn get_next(&self) -> Option { + self.get_next_for_core(0) } } @@ -104,8 +201,8 @@ mod clist { #[derive(Debug, Copy, Clone)] pub struct CList { - tail: [u8; N_QUEUES], - next_idxs: [u8; N_THREADS], + tail: [RunqueueId; N_QUEUES], + next_idxs: [ThreadId; N_THREADS], } impl CList { @@ -145,7 +242,46 @@ mod clist { } } - pub fn pop_head(&mut self, rq: RunqueueId) -> Option { + /// Delete a thread from the runqueue. + pub fn del(&mut self, n: ThreadId, rq: RunqueueId) { + if self.next_idxs[n as usize] == Self::sentinel() { + // Thread is not in rq, do nothing. + return; + } + + if self.next_idxs[n as usize] == n { + // `n` should always be the tail in this case, but better be + // safe and double-check. + if self.tail[rq as usize] == n { + // `n` bites itself, so there's only one entry. + // Clear tail. + self.tail[rq as usize] = Self::sentinel(); + } + } else { + let next = self.next_idxs[n as usize]; + + // Find previous in list and update its next-idx. + let mut prev = next; + // Worst-case performance is O(n). + loop { + if self.next_idxs[prev as usize] == n { + break; + } + prev = self.next_idxs[prev as usize]; + } + self.next_idxs[prev as usize] = next as ThreadId; + + // Update tail if the thread was the tail. + if self.tail[rq as usize] == n { + self.tail[rq as usize] = prev + } + } + + // Clear thread's value. + self.next_idxs[n as usize] = Self::sentinel(); + } + + pub fn pop_head(&mut self, rq: RunqueueId) -> Option { if self.tail[rq as usize] == Self::sentinel() { // rq is empty, do nothing None @@ -168,7 +304,7 @@ mod clist { } } - pub fn peek_head(&self, rq: RunqueueId) -> Option { + pub fn peek_head(&self, rq: RunqueueId) -> Option { if self.tail[rq as usize] == Self::sentinel() { None } else { @@ -181,6 +317,10 @@ mod clist { self.tail[rq as usize] = self.next_idxs[self.tail[rq as usize] as usize]; } } + + pub fn peek_next(&self, curr: ThreadId) -> ThreadId { + self.next_idxs[curr as usize] + } } #[cfg(test)] @@ -226,11 +366,11 @@ mod clist { assert!(clist.is_empty(0)); for i in 0..(N - 1) { println!("pushing {}", i); - clist.push(i as u8, 0); + clist.push(i as ThreadId, 0); } for i in 0..(N - 1) { println!("{}", i); - assert_eq!(clist.pop_head(0), Some(i as u8)); + assert_eq!(clist.pop_head(0), Some(i as ThreadId)); } assert_eq!(clist.pop_head(0), None); assert!(clist.is_empty(0)); From 6518b0add3e01583f510fffbc0834a563587a4d0 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Wed, 10 Apr 2024 22:27:27 +0200 Subject: [PATCH 02/11] runqueue: test multicore support --- src/riot-rs-runqueue/src/lib.rs | 131 ++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/src/riot-rs-runqueue/src/lib.rs b/src/riot-rs-runqueue/src/lib.rs index 4112f4862..7846335cc 100644 --- a/src/riot-rs-runqueue/src/lib.rs +++ b/src/riot-rs-runqueue/src/lib.rs @@ -95,4 +95,135 @@ mod tests { runqueue.advance(0); assert_eq!(runqueue.get_next(), Some(0)); } + + #[test] + fn multicore_basic() { + let mut runqueue: RunQueue<8, 32, 4> = RunQueue::new(); + + // First thread should get allocated to core 0. + assert_eq!(runqueue.add(0, 0), Some(0)); + // Second thread should get allocated to core 1. + assert_eq!(runqueue.add(1, 0), Some(1)); + + assert_eq!(runqueue.get_next_for_core(0), Some(0)); + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert!(runqueue.get_next_for_core(2).is_none()); + + // Advancing a runqueue shouldn't change any allocations + // if all threads in the queue are already running. + assert_eq!(runqueue.advance(0), None); + assert_eq!(runqueue.get_next_for_core(0), Some(0)); + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert!(runqueue.get_next_for_core(2).is_none()); + + // Restores original order. + assert_eq!(runqueue.advance(0), None); + + // Add more threads, which should be allocated to free + // cores. + assert_eq!(runqueue.add(2, 0), Some(2)); + assert_eq!(runqueue.add(3, 0), Some(3)); + assert_eq!(runqueue.add(4, 0), None); + assert_eq!(runqueue.get_next_for_core(0), Some(0)); + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert_eq!(runqueue.get_next_for_core(2), Some(2)); + assert_eq!(runqueue.get_next_for_core(3), Some(3)); + + // Advancing the runqueue now should change the mapping + // on core 0, since the previous head was running there. + assert_eq!(runqueue.advance(0), Some(0)); + assert_eq!(runqueue.get_next_for_core(0), Some(4)); + // Other allocations shouldn't change. + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert_eq!(runqueue.get_next_for_core(2), Some(2)); + assert_eq!(runqueue.get_next_for_core(3), Some(3)); + + // Adding or deleting waiting threads shouldn't change + // any allocations. + assert_eq!(runqueue.del(0, 0), None); + assert_eq!(runqueue.add(5, 0), None); + + // Deleting a running thread should allocate the waiting + // thread to the now free core. + assert_eq!(runqueue.del(2, 0), Some(2)); + assert_eq!(runqueue.get_next_for_core(2), Some(5)); + // Other allocations shouldn't change. + assert_eq!(runqueue.get_next_for_core(0), Some(4)); + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert_eq!(runqueue.get_next_for_core(3), Some(3)); + } + + #[test] + fn multicore_multiqueue() { + let mut runqueue: RunQueue<8, 32, 4> = RunQueue::new(); + + assert_eq!(runqueue.add(0, 2), Some(0)); + assert_eq!(runqueue.add(1, 2), Some(1)); + assert_eq!(runqueue.add(2, 1), Some(2)); + assert_eq!(runqueue.add(3, 0), Some(3)); + assert_eq!(runqueue.add(4, 0), None); + + assert_eq!(runqueue.get_next_for_core(0), Some(0)); + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert_eq!(runqueue.get_next_for_core(2), Some(2)); + assert_eq!(runqueue.get_next_for_core(3), Some(3)); + + // Advancing highest priority queue shouldn't change anything + // because there are more cores than threads in this priority's queue. + assert_eq!(runqueue.advance(2), None); + assert_eq!(runqueue.get_next_for_core(0), Some(0)); + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert_eq!(runqueue.get_next_for_core(2), Some(2)); + assert_eq!(runqueue.get_next_for_core(3), Some(3)); + + // Advancing lowest priority queue should change allocations + // since there are two threads in this priority's queue, + // but only one available core for them. + + // Core 3 was newly allocated. + assert_eq!(runqueue.advance(0), Some(3)); + assert_eq!(runqueue.get_next_for_core(3), Some(4)); + // Other allocations didn't change. + assert_eq!(runqueue.get_next_for_core(0), Some(0)); + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert_eq!(runqueue.get_next_for_core(2), Some(2)); + + // Restores original order. + runqueue.advance(0); + + // Delete one high-priority thread. + // The waiting low-priority thread should be allocated + // to the newly freed core. + + // Core 0 was newly allocated. + assert_eq!(runqueue.del(0, 2), Some(0)); + assert_eq!(runqueue.get_next_for_core(0), Some(4)); + // Other allocations didn't change. + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert_eq!(runqueue.get_next_for_core(2), Some(2)); + assert_eq!(runqueue.get_next_for_core(3), Some(3)); + + // Add one medium-priority thread. + // The low-priority thread furthest back in its priority queue + // should be preempted. + + // Core 0 was newly allocated. + assert_eq!(runqueue.add(5, 1), Some(0)); + assert_eq!(runqueue.get_next_for_core(0), Some(5)); + // Other allocations didn't change. + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert_eq!(runqueue.get_next_for_core(2), Some(2)); + assert_eq!(runqueue.get_next_for_core(3), Some(3)); + } + + #[test] + fn multicore_invalid_core() { + let mut runqueue: RunQueue<8, 32, 1> = RunQueue::new(); + assert_eq!(runqueue.add(0, 2), Some(0)); + assert_eq!(runqueue.add(1, 2), None); + assert_eq!(runqueue.get_next(), Some(0)); + assert_eq!(runqueue.get_next_for_core(0), Some(0)); + // Querying for n > `N_CORES` shouldn't cause a panic. + assert_eq!(runqueue.get_next_for_core(1), None) + } } From c139e681c536ca4be781f54cf34c22e66b9273cc Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Thu, 11 Apr 2024 11:14:07 +0200 Subject: [PATCH 03/11] fixup! runqueue: allocate threads to >= 1 cores Export `CoreId`. --- src/riot-rs-runqueue/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riot-rs-runqueue/src/lib.rs b/src/riot-rs-runqueue/src/lib.rs index 7846335cc..f734a3599 100644 --- a/src/riot-rs-runqueue/src/lib.rs +++ b/src/riot-rs-runqueue/src/lib.rs @@ -1,7 +1,7 @@ #![cfg_attr(not(test), no_std)] mod runqueue; -pub use runqueue::{RunQueue, RunqueueId, ThreadId}; +pub use runqueue::{CoreId, RunQueue, RunqueueId, ThreadId}; #[cfg(test)] mod tests { From 4f29f10b8edea369a6fad0a48518a7eccd1d2e08 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Fri, 12 Apr 2024 12:01:32 +0200 Subject: [PATCH 04/11] fixup! runqueue: allocate threads to >= 1 cores Clean code & improve readability. --- src/riot-rs-runqueue/src/runqueue.rs | 69 +++++++++++++++------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/src/riot-rs-runqueue/src/runqueue.rs b/src/riot-rs-runqueue/src/runqueue.rs index 145e27c98..a5be0b116 100644 --- a/src/riot-rs-runqueue/src/runqueue.rs +++ b/src/riot-rs-runqueue/src/runqueue.rs @@ -9,6 +9,18 @@ pub type RunqueueId = u8; pub type ThreadId = u8; pub type CoreId = u8; +trait FromBitmap: Sized { + fn from_bitmap(bitmap: usize) -> Option; +} +impl FromBitmap for u8 { + fn from_bitmap(bitmap: usize) -> Option { + if bitmap == 0 { + return None; + } + Some(ffs(bitmap) as ThreadId - 1) + } +} + /// Runqueue for `N_QUEUES`, supporting `N_THREADS` total. /// /// Assumptions: @@ -83,10 +95,6 @@ impl self.reallocate() } - fn ffs(val: usize) -> u32 { - USIZE_BITS as u32 - val.leading_zeros() - } - /// Returns the next thread that should run on this core. pub fn get_next_for_core(&self, core: CoreId) -> Option { if core as usize >= N_CORES { @@ -120,40 +128,33 @@ impl fn reallocate(&mut self) -> Option { let next = self.get_next_n(); let mut bitmap_next = 0; - let mut bitmap_allocateed = 0; + let mut bitmap_allocated = 0; for i in 0..N_CORES { if let Some(id) = next[i] { bitmap_next |= 1 << id } if let Some(id) = self.next[i] { - bitmap_allocateed |= 1 << id + bitmap_allocated |= 1 << id } } - if bitmap_next == bitmap_allocateed { + if bitmap_next == bitmap_allocated { return None; } - let diff = bitmap_next ^ bitmap_allocateed; - let prev_allocateed = bitmap_allocateed & diff; - let new_allocateed = bitmap_next & diff; - - let changed_core = self - .next - .iter() - .position(|i| match prev_allocateed { - 0 => i.is_none(), - id => *i == Some(Self::ffs(id) as ThreadId - 1), - }) - .unwrap(); - let new_allocation = match new_allocateed { - 0 => None, - id => Some(Self::ffs(id) as CoreId - 1), - }; - self.next[changed_core] = new_allocation; + let diff = bitmap_next ^ bitmap_allocated; + let prev_allocated = ThreadId::from_bitmap(bitmap_allocated & diff); + let new_allocated = ThreadId::from_bitmap(bitmap_next & diff); + + let changed_core = self.next.iter().position(|i| *i == prev_allocated).unwrap(); + self.next[changed_core] = new_allocated; return Some(changed_core as CoreId); } /// Returns the `n` highest priority threads in the [`Runqueue`]. /// + /// This iterates through all non-empty runqueues with descending + /// priority, until `N_CORES` threads have been found or all + /// queues have been checked. + /// /// Complexity is O(n). pub fn get_next_n(&self) -> [Option; N_CORES] { let mut next_list = [None; N_CORES]; @@ -162,16 +163,18 @@ impl let mut next = 0; for i in 0..N_CORES { if next == head { - let rq_ffs = Self::ffs(bitcache); - if rq_ffs == 0 { - break; - } - let rq = (rq_ffs - 1) as RunqueueId; - // Clear bit from bitcache. + // Switch to highest priority runqueue remaining + // in the bitcache. + let rq = match RunqueueId::from_bitmap(bitcache) { + Some(rq) => rq, + None => break, + }; + // Clear bit from bitcache so that after iterating through + // this runqueue we'll switch to the next one. bitcache &= !(1 << rq); head = match self.queues.peek_head(rq) { Some(t) => t, - None => return next_list, + None => break, }; next = head; } @@ -192,6 +195,10 @@ impl RunQueue u32 { + USIZE_BITS as u32 - val.leading_zeros() +} + mod clist { //! This module implements an array of `N_QUEUES` circular linked lists over an //! array of size `N_THREADS`. From b26e03b597d18423228ece549fd36ce27db02240 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Fri, 12 Apr 2024 13:23:39 +0200 Subject: [PATCH 05/11] fixup! runqueue: allocate threads to >= 1 cores Optimize case where `N_CORES` == 1. --- src/riot-rs-runqueue/src/runqueue.rs | 49 +++++++++++++++++----------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/src/riot-rs-runqueue/src/runqueue.rs b/src/riot-rs-runqueue/src/runqueue.rs index a5be0b116..1dc1de889 100644 --- a/src/riot-rs-runqueue/src/runqueue.rs +++ b/src/riot-rs-runqueue/src/runqueue.rs @@ -159,30 +159,41 @@ impl pub fn get_next_n(&self) -> [Option; N_CORES] { let mut next_list = [None; N_CORES]; let mut bitcache = self.bitcache; - let mut head = 0; - let mut next = 0; - for i in 0..N_CORES { - if next == head { - // Switch to highest priority runqueue remaining - // in the bitcache. - let rq = match RunqueueId::from_bitmap(bitcache) { - Some(rq) => rq, - None => break, - }; - // Clear bit from bitcache so that after iterating through - // this runqueue we'll switch to the next one. - bitcache &= !(1 << rq); - head = match self.queues.peek_head(rq) { - Some(t) => t, + // Get head from highest priority queue. + let mut head = match self.peek_head(bitcache) { + Some(head) => { + next_list[0] = Some(head); + head + } + None => return next_list, + }; + let mut thread = head; + // Iterate through threads in the queue. + for i in 1..N_CORES { + thread = self.queues.peek_next(thread); + if thread == head { + // Switch to next runqueue. + bitcache &= !(1 << (ffs(bitcache) - 1)); + head = match self.peek_head(bitcache) { + Some(h) => h, None => break, }; - next = head; - } - next_list[i] = Some(next); - next = self.queues.peek_next(next); + thread = head; + }; + next_list[i] = Some(thread); } next_list } + + fn peek_head(&self, bitcache: usize) -> Option { + // Switch to highest priority runqueue remaining + // in the bitcache. + let rq = match RunqueueId::from_bitmap(bitcache) { + Some(rq) => rq, + None => return None, + }; + self.queues.peek_head(rq) + } } impl RunQueue { From 111910ff6f0ff9bfee5b6631ad4a0dc18776d272 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Fri, 12 Apr 2024 14:27:14 +0200 Subject: [PATCH 06/11] fixup! runqueue: allocate threads to >= 1 cores Make hax (hopefully) happy. --- src/riot-rs-runqueue/src/runqueue.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/riot-rs-runqueue/src/runqueue.rs b/src/riot-rs-runqueue/src/runqueue.rs index 1dc1de889..13523bbe9 100644 --- a/src/riot-rs-runqueue/src/runqueue.rs +++ b/src/riot-rs-runqueue/src/runqueue.rs @@ -176,7 +176,8 @@ impl bitcache &= !(1 << (ffs(bitcache) - 1)); head = match self.peek_head(bitcache) { Some(h) => h, - None => break, + // Early return instead of break, to make hax happy. + None => return next_list, }; thread = head; }; @@ -279,19 +280,16 @@ mod clist { let next = self.next_idxs[n as usize]; // Find previous in list and update its next-idx. - let mut prev = next; - // Worst-case performance is O(n). - loop { - if self.next_idxs[prev as usize] == n { - break; - } - prev = self.next_idxs[prev as usize]; - } - self.next_idxs[prev as usize] = next as ThreadId; + let prev = self + .next_idxs + .iter() + .position(|n| *n == next) + .expect("List is circular."); + self.next_idxs[prev] = next as ThreadId; // Update tail if the thread was the tail. if self.tail[rq as usize] == n { - self.tail[rq as usize] = prev + self.tail[rq as usize] = prev as ThreadId; } } From 0e07cb02455484b5ced3acef3bf665290affd730 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Fri, 12 Apr 2024 16:25:15 +0200 Subject: [PATCH 07/11] fixup! fixup! runqueue: allocate threads to >= 1 cores --- src/riot-rs-runqueue/src/runqueue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riot-rs-runqueue/src/runqueue.rs b/src/riot-rs-runqueue/src/runqueue.rs index 13523bbe9..aea0ecf26 100644 --- a/src/riot-rs-runqueue/src/runqueue.rs +++ b/src/riot-rs-runqueue/src/runqueue.rs @@ -283,7 +283,7 @@ mod clist { let prev = self .next_idxs .iter() - .position(|n| *n == next) + .position(|next_idx| *next_idx == n) .expect("List is circular."); self.next_idxs[prev] = next as ThreadId; From 65e44604411c94851c6770797068ac37e6d7f262 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Fri, 12 Apr 2024 16:57:27 +0200 Subject: [PATCH 08/11] runqueue: fix `RunQueue::advance` for multicore In the multicore scenario, a yielding thread isn't necessarily the head of the runqueue. This commit adds a method `advance_from` to yield from any thread in the queue. The thread is then moved to the tail of the runqueue. --- src/riot-rs-runqueue/src/lib.rs | 39 +++++++++++++++++++++++----- src/riot-rs-runqueue/src/runqueue.rs | 32 +++++++++++++++++++++-- 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/src/riot-rs-runqueue/src/lib.rs b/src/riot-rs-runqueue/src/lib.rs index f734a3599..e30de81ff 100644 --- a/src/riot-rs-runqueue/src/lib.rs +++ b/src/riot-rs-runqueue/src/lib.rs @@ -111,13 +111,13 @@ mod tests { // Advancing a runqueue shouldn't change any allocations // if all threads in the queue are already running. - assert_eq!(runqueue.advance(0), None); + assert_eq!(runqueue.advance_from(0, 0), None); assert_eq!(runqueue.get_next_for_core(0), Some(0)); assert_eq!(runqueue.get_next_for_core(1), Some(1)); assert!(runqueue.get_next_for_core(2).is_none()); // Restores original order. - assert_eq!(runqueue.advance(0), None); + assert_eq!(runqueue.advance_from(1, 0), None); // Add more threads, which should be allocated to free // cores. @@ -131,7 +131,7 @@ mod tests { // Advancing the runqueue now should change the mapping // on core 0, since the previous head was running there. - assert_eq!(runqueue.advance(0), Some(0)); + assert_eq!(runqueue.advance_from(0, 0), Some(0)); assert_eq!(runqueue.get_next_for_core(0), Some(4)); // Other allocations shouldn't change. assert_eq!(runqueue.get_next_for_core(1), Some(1)); @@ -170,7 +170,7 @@ mod tests { // Advancing highest priority queue shouldn't change anything // because there are more cores than threads in this priority's queue. - assert_eq!(runqueue.advance(2), None); + assert_eq!(runqueue.advance_from(0, 2), None); assert_eq!(runqueue.get_next_for_core(0), Some(0)); assert_eq!(runqueue.get_next_for_core(1), Some(1)); assert_eq!(runqueue.get_next_for_core(2), Some(2)); @@ -181,7 +181,7 @@ mod tests { // but only one available core for them. // Core 3 was newly allocated. - assert_eq!(runqueue.advance(0), Some(3)); + assert_eq!(runqueue.advance_from(3, 0), Some(3)); assert_eq!(runqueue.get_next_for_core(3), Some(4)); // Other allocations didn't change. assert_eq!(runqueue.get_next_for_core(0), Some(0)); @@ -189,7 +189,7 @@ mod tests { assert_eq!(runqueue.get_next_for_core(2), Some(2)); // Restores original order. - runqueue.advance(0); + runqueue.advance_from(4, 0); // Delete one high-priority thread. // The waiting low-priority thread should be allocated @@ -226,4 +226,31 @@ mod tests { // Querying for n > `N_CORES` shouldn't cause a panic. assert_eq!(runqueue.get_next_for_core(1), None) } + + #[test] + fn multicore_advance() { + let mut runqueue: RunQueue<8, 32, 4> = RunQueue::new(); + assert_eq!(runqueue.add(0, 0), Some(0)); + assert_eq!(runqueue.add(1, 0), Some(1)); + assert_eq!(runqueue.add(2, 0), Some(2)); + assert_eq!(runqueue.add(3, 0), Some(3)); + assert_eq!(runqueue.add(4, 0), None); + assert_eq!(runqueue.add(5, 0), None); + + // Advance head. + assert_eq!(runqueue.advance_from(0, 0), Some(0)); + assert_eq!(runqueue.get_next_for_core(0), Some(4)); + // Other allocations didn't change. + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert_eq!(runqueue.get_next_for_core(2), Some(2)); + assert_eq!(runqueue.get_next_for_core(3), Some(3)); + + // Advance from a thread that is not head. + assert_eq!(runqueue.advance_from(2, 0), Some(2)); + assert_eq!(runqueue.get_next_for_core(2), Some(5)); + // Other allocations didn't change. + assert_eq!(runqueue.get_next_for_core(0), Some(4)); + assert_eq!(runqueue.get_next_for_core(1), Some(1)); + assert_eq!(runqueue.get_next_for_core(3), Some(3)); + } } diff --git a/src/riot-rs-runqueue/src/runqueue.rs b/src/riot-rs-runqueue/src/runqueue.rs index aea0ecf26..aea1ab7b9 100644 --- a/src/riot-rs-runqueue/src/runqueue.rs +++ b/src/riot-rs-runqueue/src/runqueue.rs @@ -103,12 +103,30 @@ impl self.next[core as usize] } - /// Advances runqueue number `rq`. + /// Advances from thread `n` in runqueue number `rq`. /// /// This is used to "yield" to another thread of *the same* priority. + /// Compared to [`RunQueue::advance`], this method allows to advance from a thread that + /// is not necessarily the head of the runqueue. /// /// Returns a [`CoreId`] if the allocation for this core changed. - pub fn advance(&mut self, rq: RunqueueId) -> Option { + /// + /// **Warning: This changes the order of the runqueue because the thread is moved to the + /// tail of the queue.** + pub fn advance_from(&mut self, n: ThreadId, rq: RunqueueId) -> Option { + debug_assert!((rq as usize) < N_QUEUES); + if Some(n) == self.queues.peek_head(rq) { + self.queues.advance(rq); + } else { + // If the thread is not the head remove it + // from queue and re-insert it at tail. + self.queues.del(n, rq); + self.queues.push(n, rq); + } + self.reallocate() + } + + pub fn advance_head(&mut self, rq: RunqueueId) -> Option { debug_assert!((rq as usize) < N_QUEUES); self.queues.advance(rq); self.reallocate() @@ -205,6 +223,16 @@ impl RunQueue Option { self.get_next_for_core(0) } + + /// Advances runqueue number `rq`. + /// + /// This is used to "yield" to another thread of *the same* priority. + /// + /// Returns a [`CoreId`] if the allocation for this core changed. + pub fn advance(&mut self, rq: RunqueueId) -> Option { + self.queues.advance(rq); + self.reallocate() + } } fn ffs(val: usize) -> u32 { From f85db4d1189cddc37efa4cb46ce50b3a87e53063 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Tue, 16 Apr 2024 21:31:43 +0200 Subject: [PATCH 09/11] fixup! runqueue: fix `RunQueue::advance` for multicore Remove unused method `advance_head` (duplication of `Runqueue::advance`). --- src/riot-rs-runqueue/src/runqueue.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/riot-rs-runqueue/src/runqueue.rs b/src/riot-rs-runqueue/src/runqueue.rs index aea1ab7b9..87f10c8b3 100644 --- a/src/riot-rs-runqueue/src/runqueue.rs +++ b/src/riot-rs-runqueue/src/runqueue.rs @@ -126,12 +126,6 @@ impl self.reallocate() } - pub fn advance_head(&mut self, rq: RunqueueId) -> Option { - debug_assert!((rq as usize) < N_QUEUES); - self.queues.advance(rq); - self.reallocate() - } - /// Update `self.next` so that the highest `N_CORES` threads /// are allocated. /// @@ -230,6 +224,7 @@ impl RunQueue Option { + debug_assert!((rq as usize) < N_QUEUES); self.queues.advance(rq); self.reallocate() } From dcf08c190f7fcaaa7856cbe59fcc6947a0455eaa Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Wed, 17 Apr 2024 13:31:47 +0200 Subject: [PATCH 10/11] fixup! runqueue: allocate threads to >= 1 cores Handle case N_CORES == 1 separately. --- src/riot-rs-runqueue/src/runqueue.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/riot-rs-runqueue/src/runqueue.rs b/src/riot-rs-runqueue/src/runqueue.rs index 87f10c8b3..37c595415 100644 --- a/src/riot-rs-runqueue/src/runqueue.rs +++ b/src/riot-rs-runqueue/src/runqueue.rs @@ -82,11 +82,16 @@ impl debug_assert!((n as usize) < N_THREADS); debug_assert!((rq as usize) < N_QUEUES); - if self.queues.peek_head(rq) == Some(n) { + if N_CORES == 1 { let popped = self.queues.pop_head(rq); assert_eq!(popped, Some(n)); } else { - self.queues.del(n, rq); + if self.queues.peek_head(rq) == Some(n) { + let popped = self.queues.pop_head(rq); + assert_eq!(popped, Some(n)); + } else { + self.queues.del(n, rq); + } } if self.queues.is_empty(rq) { @@ -138,6 +143,14 @@ impl /// /// The complexity of this call is O(n). fn reallocate(&mut self) -> Option { + if N_CORES == 1 { + let next = self.peek_head(self.bitcache); + if next == self.next[0] { + return None; + } + self.next[0] = next; + return Some(0); + } let next = self.get_next_n(); let mut bitmap_next = 0; let mut bitmap_allocated = 0; From 441611ef3fe10189f2f5e5c8febc629e88cc2cbd Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Fri, 19 Apr 2024 12:10:33 +0200 Subject: [PATCH 11/11] runqueue: introduce `GlobalRunqueue` trait Add new `GlobalRunqueue` trait to differentiate and optimize for different number of cores on the type level. It uses the `min_specialization` rust feature to have a generic implementation for _n_ number of cores and fine-grained specialization of methods for concrete _n_. --- src/riot-rs-runqueue/src/lib.rs | 171 ++++++------- src/riot-rs-runqueue/src/runqueue.rs | 291 ++++++++++++++--------- src/riot-rs-threads/src/arch/cortex_m.rs | 2 +- src/riot-rs-threads/src/lib.rs | 8 +- 4 files changed, 273 insertions(+), 199 deletions(-) diff --git a/src/riot-rs-runqueue/src/lib.rs b/src/riot-rs-runqueue/src/lib.rs index e30de81ff..79a58b4a1 100644 --- a/src/riot-rs-runqueue/src/lib.rs +++ b/src/riot-rs-runqueue/src/lib.rs @@ -1,7 +1,8 @@ #![cfg_attr(not(test), no_std)] +#![feature(min_specialization)] mod runqueue; -pub use runqueue::{CoreId, RunQueue, RunqueueId, ThreadId}; +pub use runqueue::{CoreId, GlobalRunqueue, RunQueue, RunqueueId, ThreadId}; #[cfg(test)] mod tests { @@ -15,24 +16,24 @@ mod tests { runqueue.add(1, 0); runqueue.add(2, 0); - assert_eq!(runqueue.get_next(), Some(0)); + assert_eq!(runqueue.get_next(0), Some(0)); - runqueue.advance(0); + runqueue.advance(0, 0); - assert_eq!(runqueue.get_next(), Some(1)); - runqueue.advance(0); + assert_eq!(runqueue.get_next(0), Some(1)); + runqueue.advance(0, 0); - assert_eq!(runqueue.get_next(), Some(2)); - assert_eq!(runqueue.get_next(), Some(2)); + assert_eq!(runqueue.get_next(0), Some(2)); + assert_eq!(runqueue.get_next(0), Some(2)); - runqueue.advance(0); - assert_eq!(runqueue.get_next(), Some(0)); + runqueue.advance(0, 0); + assert_eq!(runqueue.get_next(0), Some(0)); - runqueue.advance(0); - assert_eq!(runqueue.get_next(), Some(1)); + runqueue.advance(0, 0); + assert_eq!(runqueue.get_next(0), Some(1)); - runqueue.advance(0); - assert_eq!(runqueue.get_next(), Some(2)); + runqueue.advance(0, 0); + assert_eq!(runqueue.get_next(0), Some(2)); } #[test] @@ -44,13 +45,13 @@ mod tests { } for i in 0..=31 { - assert_eq!(runqueue.get_next(), Some(i)); - runqueue.advance(0); + assert_eq!(runqueue.get_next(0), Some(i)); + runqueue.advance(0, 0); } for i in 0..=31 { - assert_eq!(runqueue.get_next(), Some(i)); - runqueue.advance(0); + assert_eq!(runqueue.get_next(0), Some(i)); + runqueue.advance(0, 0); } } @@ -65,17 +66,17 @@ mod tests { runqueue.add(2, 1); runqueue.add(4, 1); - assert_eq!(runqueue.get_next(), Some(2)); + assert_eq!(runqueue.get_next(0), Some(2)); runqueue.del(2, 1); - assert_eq!(runqueue.get_next(), Some(4)); + assert_eq!(runqueue.get_next(0), Some(4)); runqueue.del(4, 1); - assert_eq!(runqueue.get_next(), Some(0)); + assert_eq!(runqueue.get_next(0), Some(0)); runqueue.del(0, 0); - assert_eq!(runqueue.get_next(), Some(1)); + assert_eq!(runqueue.get_next(0), Some(1)); runqueue.del(1, 0); - assert_eq!(runqueue.get_next(), Some(3)); + assert_eq!(runqueue.get_next(0), Some(3)); runqueue.del(3, 0); - assert_eq!(runqueue.get_next(), None); + assert_eq!(runqueue.get_next(0), None); } #[test] fn test_push_twice() { @@ -84,16 +85,16 @@ mod tests { runqueue.add(0, 0); runqueue.add(1, 0); - assert_eq!(runqueue.get_next(), Some(0)); + assert_eq!(runqueue.get_next(0), Some(0)); runqueue.del(0, 0); - assert_eq!(runqueue.get_next(), Some(1)); + assert_eq!(runqueue.get_next(0), Some(1)); runqueue.add(0, 0); - assert_eq!(runqueue.get_next(), Some(1)); + assert_eq!(runqueue.get_next(0), Some(1)); - runqueue.advance(0); - assert_eq!(runqueue.get_next(), Some(0)); + runqueue.advance(0, 0); + assert_eq!(runqueue.get_next(0), Some(0)); } #[test] @@ -105,38 +106,38 @@ mod tests { // Second thread should get allocated to core 1. assert_eq!(runqueue.add(1, 0), Some(1)); - assert_eq!(runqueue.get_next_for_core(0), Some(0)); - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert!(runqueue.get_next_for_core(2).is_none()); + assert_eq!(runqueue.get_next(0), Some(0)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert!(runqueue.get_next(2).is_none()); // Advancing a runqueue shouldn't change any allocations // if all threads in the queue are already running. - assert_eq!(runqueue.advance_from(0, 0), None); - assert_eq!(runqueue.get_next_for_core(0), Some(0)); - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert!(runqueue.get_next_for_core(2).is_none()); + assert_eq!(runqueue.advance(0, 0), None); + assert_eq!(runqueue.get_next(0), Some(0)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert!(runqueue.get_next(2).is_none()); // Restores original order. - assert_eq!(runqueue.advance_from(1, 0), None); + assert_eq!(runqueue.advance(1, 0), None); // Add more threads, which should be allocated to free // cores. assert_eq!(runqueue.add(2, 0), Some(2)); assert_eq!(runqueue.add(3, 0), Some(3)); assert_eq!(runqueue.add(4, 0), None); - assert_eq!(runqueue.get_next_for_core(0), Some(0)); - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert_eq!(runqueue.get_next_for_core(2), Some(2)); - assert_eq!(runqueue.get_next_for_core(3), Some(3)); + assert_eq!(runqueue.get_next(0), Some(0)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert_eq!(runqueue.get_next(2), Some(2)); + assert_eq!(runqueue.get_next(3), Some(3)); // Advancing the runqueue now should change the mapping // on core 0, since the previous head was running there. - assert_eq!(runqueue.advance_from(0, 0), Some(0)); - assert_eq!(runqueue.get_next_for_core(0), Some(4)); + assert_eq!(runqueue.advance(0, 0), Some(0)); + assert_eq!(runqueue.get_next(0), Some(4)); // Other allocations shouldn't change. - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert_eq!(runqueue.get_next_for_core(2), Some(2)); - assert_eq!(runqueue.get_next_for_core(3), Some(3)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert_eq!(runqueue.get_next(2), Some(2)); + assert_eq!(runqueue.get_next(3), Some(3)); // Adding or deleting waiting threads shouldn't change // any allocations. @@ -146,11 +147,11 @@ mod tests { // Deleting a running thread should allocate the waiting // thread to the now free core. assert_eq!(runqueue.del(2, 0), Some(2)); - assert_eq!(runqueue.get_next_for_core(2), Some(5)); + assert_eq!(runqueue.get_next(2), Some(5)); // Other allocations shouldn't change. - assert_eq!(runqueue.get_next_for_core(0), Some(4)); - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert_eq!(runqueue.get_next_for_core(3), Some(3)); + assert_eq!(runqueue.get_next(0), Some(4)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert_eq!(runqueue.get_next(3), Some(3)); } #[test] @@ -163,33 +164,33 @@ mod tests { assert_eq!(runqueue.add(3, 0), Some(3)); assert_eq!(runqueue.add(4, 0), None); - assert_eq!(runqueue.get_next_for_core(0), Some(0)); - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert_eq!(runqueue.get_next_for_core(2), Some(2)); - assert_eq!(runqueue.get_next_for_core(3), Some(3)); + assert_eq!(runqueue.get_next(0), Some(0)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert_eq!(runqueue.get_next(2), Some(2)); + assert_eq!(runqueue.get_next(3), Some(3)); // Advancing highest priority queue shouldn't change anything // because there are more cores than threads in this priority's queue. - assert_eq!(runqueue.advance_from(0, 2), None); - assert_eq!(runqueue.get_next_for_core(0), Some(0)); - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert_eq!(runqueue.get_next_for_core(2), Some(2)); - assert_eq!(runqueue.get_next_for_core(3), Some(3)); + assert_eq!(runqueue.advance(0, 2), None); + assert_eq!(runqueue.get_next(0), Some(0)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert_eq!(runqueue.get_next(2), Some(2)); + assert_eq!(runqueue.get_next(3), Some(3)); // Advancing lowest priority queue should change allocations // since there are two threads in this priority's queue, // but only one available core for them. // Core 3 was newly allocated. - assert_eq!(runqueue.advance_from(3, 0), Some(3)); - assert_eq!(runqueue.get_next_for_core(3), Some(4)); + assert_eq!(runqueue.advance(3, 0), Some(3)); + assert_eq!(runqueue.get_next(3), Some(4)); // Other allocations didn't change. - assert_eq!(runqueue.get_next_for_core(0), Some(0)); - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert_eq!(runqueue.get_next_for_core(2), Some(2)); + assert_eq!(runqueue.get_next(0), Some(0)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert_eq!(runqueue.get_next(2), Some(2)); // Restores original order. - runqueue.advance_from(4, 0); + runqueue.advance(4, 0); // Delete one high-priority thread. // The waiting low-priority thread should be allocated @@ -197,11 +198,11 @@ mod tests { // Core 0 was newly allocated. assert_eq!(runqueue.del(0, 2), Some(0)); - assert_eq!(runqueue.get_next_for_core(0), Some(4)); + assert_eq!(runqueue.get_next(0), Some(4)); // Other allocations didn't change. - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert_eq!(runqueue.get_next_for_core(2), Some(2)); - assert_eq!(runqueue.get_next_for_core(3), Some(3)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert_eq!(runqueue.get_next(2), Some(2)); + assert_eq!(runqueue.get_next(3), Some(3)); // Add one medium-priority thread. // The low-priority thread furthest back in its priority queue @@ -209,11 +210,11 @@ mod tests { // Core 0 was newly allocated. assert_eq!(runqueue.add(5, 1), Some(0)); - assert_eq!(runqueue.get_next_for_core(0), Some(5)); + assert_eq!(runqueue.get_next(0), Some(5)); // Other allocations didn't change. - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert_eq!(runqueue.get_next_for_core(2), Some(2)); - assert_eq!(runqueue.get_next_for_core(3), Some(3)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert_eq!(runqueue.get_next(2), Some(2)); + assert_eq!(runqueue.get_next(3), Some(3)); } #[test] @@ -221,10 +222,10 @@ mod tests { let mut runqueue: RunQueue<8, 32, 1> = RunQueue::new(); assert_eq!(runqueue.add(0, 2), Some(0)); assert_eq!(runqueue.add(1, 2), None); - assert_eq!(runqueue.get_next(), Some(0)); - assert_eq!(runqueue.get_next_for_core(0), Some(0)); + assert_eq!(runqueue.get_next(0), Some(0)); + assert_eq!(runqueue.get_next(0), Some(0)); // Querying for n > `N_CORES` shouldn't cause a panic. - assert_eq!(runqueue.get_next_for_core(1), None) + assert_eq!(runqueue.get_next(1), None) } #[test] @@ -238,19 +239,19 @@ mod tests { assert_eq!(runqueue.add(5, 0), None); // Advance head. - assert_eq!(runqueue.advance_from(0, 0), Some(0)); - assert_eq!(runqueue.get_next_for_core(0), Some(4)); + assert_eq!(runqueue.advance(0, 0), Some(0)); + assert_eq!(runqueue.get_next(0), Some(4)); // Other allocations didn't change. - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert_eq!(runqueue.get_next_for_core(2), Some(2)); - assert_eq!(runqueue.get_next_for_core(3), Some(3)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert_eq!(runqueue.get_next(2), Some(2)); + assert_eq!(runqueue.get_next(3), Some(3)); // Advance from a thread that is not head. - assert_eq!(runqueue.advance_from(2, 0), Some(2)); - assert_eq!(runqueue.get_next_for_core(2), Some(5)); + assert_eq!(runqueue.advance(2, 0), Some(2)); + assert_eq!(runqueue.get_next(2), Some(5)); // Other allocations didn't change. - assert_eq!(runqueue.get_next_for_core(0), Some(4)); - assert_eq!(runqueue.get_next_for_core(1), Some(1)); - assert_eq!(runqueue.get_next_for_core(3), Some(3)); + assert_eq!(runqueue.get_next(0), Some(4)); + assert_eq!(runqueue.get_next(1), Some(1)); + assert_eq!(runqueue.get_next(3), Some(3)); } } diff --git a/src/riot-rs-runqueue/src/runqueue.rs b/src/riot-rs-runqueue/src/runqueue.rs index 838ff4e29..15d3125ab 100644 --- a/src/riot-rs-runqueue/src/runqueue.rs +++ b/src/riot-rs-runqueue/src/runqueue.rs @@ -20,7 +20,7 @@ impl FromBitmap for u8 { if bitmap == 0 { return None; } - Some(ffs(bitmap) as ThreadId - 1) + Some(ffs(bitmap) as u8 - 1) } } @@ -61,17 +61,69 @@ impl } } + /// Returns the next thread that should run on this core. + pub fn get_next(&self, core: CoreId) -> Option { + if core as usize >= N_CORES { + return None; + } + self.next[core as usize] + } + + /// Returns the `n` highest priority threads in the [`RunQueue`]. + /// + /// This iterates through all non-empty runqueues with descending + /// priority, until `N_CORES` threads have been found or all + /// queues have been checked. + /// + /// Complexity is O(n). + fn get_next_n(&self) -> [Option; N_CORES] { + let mut next_list = [None; N_CORES]; + let mut bitcache = self.bitcache; + // Get head from highest priority queue. + let mut head = match self.peek_head(bitcache) { + Some(head) => { + next_list[0] = Some(head); + head + } + None => return next_list, + }; + let mut thread = head; + // Iterate through threads in the queue. + for i in 1..N_CORES { + thread = self.queues.peek_next(thread); + if thread == head { + // Switch to next runqueue. + bitcache &= !(1 << (ffs(bitcache) - 1)); + head = match self.peek_head(bitcache) { + Some(h) => h, + // Early return instead of break, to make hax happy. + None => return next_list, + }; + thread = head; + }; + next_list[i] = Some(thread); + } + next_list + } + + #[inline] + fn peek_head(&self, bitcache: usize) -> Option { + // Switch to highest priority runqueue remaining + // in the bitcache. + let rq = match RunqueueId::from_bitmap(bitcache) { + Some(rq) => rq, + None => return None, + }; + self.queues.peek_head(rq) + } +} + +pub trait GlobalRunqueue { /// Adds thread with pid `n` to runqueue number `rq`. /// /// Returns a [`CoreId`] if the allocation for this core changed. /// - pub fn add(&mut self, n: ThreadId, rq: RunqueueId) -> Option { - debug_assert!((n as usize) < N_THREADS); - debug_assert!((rq as usize) < N_QUEUES); - self.bitcache |= 1 << rq; - self.queues.push(n, rq); - self.reallocate() - } + fn add(&mut self, n: ThreadId, rq: RunqueueId) -> Option; /// Removes thread with pid `n` from runqueue number `rq`. /// @@ -79,22 +131,53 @@ impl /// /// # Panics /// - /// Panics if `n` is not the queue's head. + /// Panics for `N_CORES == 1`` if `n` is not the queue's head. /// This is fine, RIOT-rs only ever calls `del()` for the current thread. - pub fn del(&mut self, n: ThreadId, rq: RunqueueId) -> Option { + fn del(&mut self, n: ThreadId, rq: RunqueueId) -> Option; + + /// Advances from thread `n` in runqueue number `rq`. + /// + /// This is used to "yield" to another thread of *the same* priority. + /// + /// Returns a [`CoreId`] if the allocation for this core changed. + /// + /// **Warning: If `n` it not head if the run queue, this changes + /// the order of the queue because the thread is moved to the + /// tail.** + fn advance(&mut self, n: ThreadId, rq: RunqueueId) -> Option; + + /// Update `self.next` so that the highest `N_CORES` threads + /// are allocated. + /// + /// This only changes allocations if a thread was previously allocated + /// and is now not part of the new list anymore, or the other way around. + /// It assumes that there was maximum one change in the runqueue since the + /// last reallocation (only one add/ delete or a runqueue advancement)! + /// + /// Returns a [`CoreId`] if the allocation for this core changed. + fn reallocate(&mut self) -> Option; +} + +impl + GlobalRunqueue for RunQueue +{ + default fn add(&mut self, n: ThreadId, rq: RunqueueId) -> Option { debug_assert!((n as usize) < N_THREADS); debug_assert!((rq as usize) < N_QUEUES); + self.bitcache |= 1 << rq; + self.queues.push(n, rq); + self.reallocate() + } - if N_CORES == 1 { + default fn del(&mut self, n: ThreadId, rq: RunqueueId) -> Option { + debug_assert!((n as usize) < N_THREADS); + debug_assert!((rq as usize) < N_QUEUES); + + if self.queues.peek_head(rq) == Some(n) { let popped = self.queues.pop_head(rq); assert_eq!(popped, Some(n)); } else { - if self.queues.peek_head(rq) == Some(n) { - let popped = self.queues.pop_head(rq); - assert_eq!(popped, Some(n)); - } else { - self.queues.del(n, rq); - } + self.queues.del(n, rq); } if self.queues.is_empty(rq) { @@ -103,25 +186,7 @@ impl self.reallocate() } - /// Returns the next thread that should run on this core. - pub fn get_next_for_core(&self, core: CoreId) -> Option { - if core as usize >= N_CORES { - return None; - } - self.next[core as usize] - } - - /// Advances from thread `n` in runqueue number `rq`. - /// - /// This is used to "yield" to another thread of *the same* priority. - /// Compared to [`RunQueue::advance`], this method allows to advance from a thread that - /// is not necessarily the head of the runqueue. - /// - /// Returns a [`CoreId`] if the allocation for this core changed. - /// - /// **Warning: This changes the order of the runqueue because the thread is moved to the - /// tail of the queue.** - pub fn advance_from(&mut self, n: ThreadId, rq: RunqueueId) -> Option { + default fn advance(&mut self, n: ThreadId, rq: RunqueueId) -> Option { debug_assert!((rq as usize) < N_QUEUES); if Some(n) == self.queues.peek_head(rq) { self.queues.advance(rq); @@ -134,26 +199,7 @@ impl self.reallocate() } - /// Update `self.next` so that the highest `N_CORES` threads - /// are allocated. - /// - /// This only changes allocations if a thread was previously allocated - /// and is now not part of the new list anymore, or the other way around. - /// It assumes that there was maximum one change in the runqueue since the - /// last reallocation (only one add/ delete or a runqueue advancement)! - /// - /// Returns a [`CoreId`] if the allocation for this core changed. - /// - /// The complexity of this call is O(n). - fn reallocate(&mut self) -> Option { - if N_CORES == 1 { - let next = self.peek_head(self.bitcache); - if next == self.next[0] { - return None; - } - self.next[0] = next; - return Some(0); - } + default fn reallocate(&mut self) -> Option { let next = self.get_next_n(); let mut bitmap_next = 0; let mut bitmap_allocated = 0; @@ -176,72 +222,84 @@ impl self.next[changed_core] = new_allocated; return Some(changed_core as CoreId); } +} - /// Returns the `n` highest priority threads in the [`Runqueue`]. +impl GlobalRunqueue + for RunQueue +{ + /// Advances runqueue number `rq`. /// - /// This iterates through all non-empty runqueues with descending - /// priority, until `N_CORES` threads have been found or all - /// queues have been checked. + /// This is used to "yield" to another thread of *the same* priority. /// - /// Complexity is O(n). - pub fn get_next_n(&self) -> [Option; N_CORES] { - let mut next_list = [None; N_CORES]; - let mut bitcache = self.bitcache; - // Get head from highest priority queue. - let mut head = match self.peek_head(bitcache) { - Some(head) => { - next_list[0] = Some(head); - head - } - None => return next_list, - }; - let mut thread = head; - // Iterate through threads in the queue. - for i in 1..N_CORES { - thread = self.queues.peek_next(thread); - if thread == head { - // Switch to next runqueue. - bitcache &= !(1 << (ffs(bitcache) - 1)); - head = match self.peek_head(bitcache) { - Some(h) => h, - // Early return instead of break, to make hax happy. - None => return next_list, - }; - thread = head; - }; - next_list[i] = Some(thread); + /// Returns a [`CoreId`] if the allocation for this core changed. + fn advance(&mut self, _: ThreadId, rq: RunqueueId) -> Option { + debug_assert!((rq as usize) < N_QUEUES); + self.queues.advance(rq); + self.reallocate() + } + + #[inline] + fn del(&mut self, n: ThreadId, rq: RunqueueId) -> Option { + debug_assert!((n as usize) < N_THREADS); + debug_assert!((rq as usize) < N_QUEUES); + let popped = self.queues.pop_head(rq); + assert_eq!(popped, Some(n)); + if self.queues.is_empty(rq) { + self.bitcache &= !(1 << rq); } - next_list + self.reallocate() } - fn peek_head(&self, bitcache: usize) -> Option { - // Switch to highest priority runqueue remaining - // in the bitcache. - let rq = match RunqueueId::from_bitmap(bitcache) { - Some(rq) => rq, - None => return None, - }; - self.queues.peek_head(rq) + #[inline] + fn reallocate(&mut self) -> Option { + let next = self.peek_head(self.bitcache); + if next == self.next[0] { + return None; + } + self.next[0] = next; + return Some(0); } } -impl RunQueue { - /// Returns the pid that should run next. - /// - /// Returns the next runnable thread of - /// the runqueue with the highest index. - pub fn get_next(&self) -> Option { - self.get_next_for_core(0) +impl GlobalRunqueue + for RunQueue +{ + fn reallocate(&mut self) -> Option { + let next = self.get_next_n(); + + if self.next[0] == next[0] { + if self.next[1] == next[1] { + return None; + } + self.next[1] = next[1]; + return Some(1); + } + if self.next[1] == next[0] { + if self.next[0] == next[1] { + return None; + } + self.next[0] = next[1]; + return Some(0); + } + if self.next[1] == next[1] { + self.next[0] = next[0]; + return Some(0); + } else { + self.next[1] = next[0]; + Some(1) + } } - /// Advances runqueue number `rq`. - /// - /// This is used to "yield" to another thread of *the same* priority. - /// - /// Returns a [`CoreId`] if the allocation for this core changed. - pub fn advance(&mut self, rq: RunqueueId) -> Option { + fn advance(&mut self, n: ThreadId, rq: RunqueueId) -> Option { debug_assert!((rq as usize) < N_QUEUES); - self.queues.advance(rq); + if Some(n) == self.queues.peek_head(rq) { + self.queues.advance(rq); + } else { + // If the thread is not the head remove it + // from queue and re-insert it at tail. + self.queues.pop_next(rq); + self.queues.push(n, rq); + } self.reallocate() } } @@ -359,6 +417,7 @@ mod clist { } } + #[inline] pub fn peek_head(&self, rq: RunqueueId) -> Option { if self.tail[rq as usize] == Self::sentinel() { None @@ -376,6 +435,18 @@ mod clist { pub fn peek_next(&self, curr: ThreadId) -> ThreadId { self.next_idxs[curr as usize] } + + /// Remove next thread after head in runqueue. + pub fn pop_next(&mut self, rq: RunqueueId) -> Option { + let head = self.peek_head(rq)?; + let next = self.peek_next(head); + if next == head { + return None; + } + self.next_idxs[head as usize] = self.next_idxs[next as usize]; + self.next_idxs[next as usize] = Self::sentinel(); + Some(next) + } } #[cfg(test)] diff --git a/src/riot-rs-threads/src/arch/cortex_m.rs b/src/riot-rs-threads/src/arch/cortex_m.rs index 559977044..b96b1397f 100644 --- a/src/riot-rs-threads/src/arch/cortex_m.rs +++ b/src/riot-rs-threads/src/arch/cortex_m.rs @@ -175,7 +175,7 @@ unsafe fn sched() -> usize { loop { { - if let Some(pid) = (unsafe { &*THREADS.as_ptr(cs) }).runqueue.get_next() { + if let Some(pid) = (unsafe { &*THREADS.as_ptr(cs) }).runqueue.get_next(0) { next_pid = pid; break; } diff --git a/src/riot-rs-threads/src/lib.rs b/src/riot-rs-threads/src/lib.rs index d403e8dcd..d8dc607a5 100644 --- a/src/riot-rs-threads/src/lib.rs +++ b/src/riot-rs-threads/src/lib.rs @@ -6,7 +6,7 @@ // invariants #![allow(clippy::indexing_slicing)] -use riot_rs_runqueue::RunQueue; +use riot_rs_runqueue::{GlobalRunqueue, RunQueue}; pub use riot_rs_runqueue::{RunqueueId, ThreadId}; mod arch; @@ -287,8 +287,10 @@ fn cleanup() -> ! { /// "Yields" to another thread with the same priority. pub fn yield_same() { THREADS.with_mut(|mut threads| { - let runqueue = threads.current().unwrap().prio; - threads.runqueue.advance(runqueue); + let thread = threads.current().unwrap(); + let runqueue = thread.prio; + let pid = thread.pid; + threads.runqueue.advance(pid, runqueue); schedule(); }) }