Skip to content

Commit

Permalink
concurrency: Generalize UnblockCallback to MachineCallback
Browse files Browse the repository at this point in the history
    * Introduce MachineCallbackState enum to represent operation outcomes
    * Consolidate unblock/timeout methods into single callback interface
    * Update thread blocking system to use new callback mechanism
    * Refactor mutex and condvar implementations for new callback pattern

Signed-off-by: shamb0 <r.raajey@gmail.com>
  • Loading branch information
shamb0 committed Dec 26, 2024
1 parent 505efe4 commit 81afa3e
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 143 deletions.
4 changes: 2 additions & 2 deletions src/concurrency/init_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::VecDeque;

use rustc_index::Idx;

use super::thread::DynUnblockCallback;
use super::thread::DyMachineCallback;
use super::vector_clock::VClock;
use crate::*;

Expand Down Expand Up @@ -35,7 +35,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {

/// Put the thread into the queue waiting for the initialization.
#[inline]
fn init_once_enqueue_and_block(&mut self, id: InitOnceId, callback: DynUnblockCallback<'tcx>) {
fn init_once_enqueue_and_block(&mut self, id: InitOnceId, callback: DyMachineCallback<'tcx>) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let init_once = &mut this.machine.sync.init_onces[id];
Expand Down
133 changes: 81 additions & 52 deletions src/concurrency/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,15 +422,22 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
mutex_ref: MutexRef,
retval_dest: Option<(Scalar, MPlaceTy<'tcx>)>,
}
@unblock = |this| {
assert!(!this.mutex_is_locked(&mutex_ref));
this.mutex_lock(&mutex_ref);

if let Some((retval, dest)) = retval_dest {
this.write_scalar(retval, &dest)?;
@unblock = |this, tcb_state| {
match tcb_state {
MachineCallbackState::Ready => {
assert!(!this.mutex_is_locked(&mutex_ref));
this.mutex_lock(&mutex_ref);

if let Some((retval, dest)) = retval_dest {
this.write_scalar(retval, &dest)?;
}

interp_ok(())
},
MachineCallbackState::TimedOut => {
panic!("Mutex operation received unexpected timeout state - mutex operations do not support timeouts")
},
}

interp_ok(())
}
),
);
Expand Down Expand Up @@ -538,10 +545,17 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
retval: Scalar,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
this.rwlock_reader_lock(id);
this.write_scalar(retval, &dest)?;
interp_ok(())
@unblock = |this, tcb_state| {
match tcb_state {
MachineCallbackState::Ready => {
this.rwlock_reader_lock(id);
this.write_scalar(retval, &dest)?;
interp_ok(())
},
MachineCallbackState::TimedOut => {
panic!("RwLock operation received unexpected timeout state - RwLock operations do not support timeouts")
},
}
}
),
);
Expand Down Expand Up @@ -623,10 +637,17 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
retval: Scalar,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
this.rwlock_writer_lock(id);
this.write_scalar(retval, &dest)?;
interp_ok(())
@unblock = |this, tcb_state| {
match tcb_state {
MachineCallbackState::Ready => {
this.rwlock_writer_lock(id);
this.write_scalar(retval, &dest)?;
interp_ok(())
},
MachineCallbackState::TimedOut => {
panic!("RwLock operation received unexpected timeout state - RwLock operations do not support timeouts")
},
}
}
),
);
Expand Down Expand Up @@ -677,25 +698,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
retval_timeout: Scalar,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
// The condvar was signaled. Make sure we get the clock for that.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(
&this.machine.sync.condvars[condvar].clock,
&this.machine.threads,
);
@unblock = |this, tcb_state| {
match tcb_state {
MachineCallbackState::Ready => {
// The condvar was signaled. Make sure we get the clock for that.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(
&this.machine.sync.condvars[condvar].clock,
&this.machine.threads,
);
}
// Try to acquire the mutex.
// The timeout only applies to the first wait (until the signal), not for mutex acquisition.
this.condvar_reacquire_mutex(&mutex_ref, retval_succ, dest)
}
MachineCallbackState::TimedOut => {
// We have to remove the waiter from the queue again.
let thread = this.active_thread();
let waiters = &mut this.machine.sync.condvars[condvar].waiters;
waiters.retain(|waiter| *waiter != thread);
// Now get back the lock.
this.condvar_reacquire_mutex(&mutex_ref, retval_timeout, dest)
}
}
// Try to acquire the mutex.
// The timeout only applies to the first wait (until the signal), not for mutex acquisition.
this.condvar_reacquire_mutex(&mutex_ref, retval_succ, dest)
}
@timeout = |this| {
// We have to remove the waiter from the queue again.
let thread = this.active_thread();
let waiters = &mut this.machine.sync.condvars[condvar].waiters;
waiters.retain(|waiter| *waiter != thread);
// Now get back the lock.
this.condvar_reacquire_mutex(&mutex_ref, retval_timeout, dest)
}
),
);
Expand Down Expand Up @@ -752,25 +777,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
dest: MPlaceTy<'tcx>,
errno_timeout: IoError,
}
@unblock = |this| {
let futex = futex_ref.0.borrow();
// Acquire the clock of the futex.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
@unblock = |this, tcb_state| {
match tcb_state {
MachineCallbackState::Ready => {
let futex = futex_ref.0.borrow();
// Acquire the clock of the futex.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
}
// Write the return value.
this.write_scalar(retval_succ, &dest)?;
interp_ok(())
},
MachineCallbackState::TimedOut => {
// Remove the waiter from the futex.
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(errno_timeout)?;
this.write_scalar(retval_timeout, &dest)?;
interp_ok(())
},
}
// Write the return value.
this.write_scalar(retval_succ, &dest)?;
interp_ok(())
}
@timeout = |this| {
// Remove the waiter from the futex.
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(errno_timeout)?;
this.write_scalar(retval_timeout, &dest)?;
interp_ok(())
}
),
);
Expand Down
103 changes: 51 additions & 52 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,70 +38,62 @@ pub enum TlsAllocAction {
Leak,
}

/// Trait for callbacks that are executed when a thread gets unblocked.
pub trait UnblockCallback<'tcx>: VisitProvenance {
/// Will be invoked when the thread was unblocked the "regular" way,
/// i.e. whatever event it was blocking on has happened.
fn unblock(self: Box<Self>, ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>) -> InterpResult<'tcx>;

/// Will be invoked when the timeout ellapsed without the event the
/// thread was blocking on having occurred.
fn timeout(self: Box<Self>, _ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>)
-> InterpResult<'tcx>;
/// The completion state of an asynchronous machine operation in Miri's concurrency system.
/// Used by callbacks to determine how a blocked thread should proceed after being woken.
#[derive(Debug)]
pub enum MachineCallbackState {
/// The operation completed successfully and the thread can proceed with its normal execution.
Ready,
/// The operation did not complete within its specified duration and should handle the timeout case.
TimedOut,
}

/// Generic callback trait for machine operations.
pub trait MachineCallback<'tcx>: VisitProvenance {
/// Called when the operation completes (successfully or with timeout).
fn call(
self: Box<Self>,
ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
result: MachineCallbackState,
) -> InterpResult<'tcx>;
}
pub type DynUnblockCallback<'tcx> = Box<dyn UnblockCallback<'tcx> + 'tcx>;

pub type DyMachineCallback<'tcx> = Box<dyn MachineCallback<'tcx> + 'tcx>;

/// Macro for creating machine callbacks
#[macro_export]
macro_rules! callback {
(
@capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? }
@unblock = |$this:ident| $unblock:block
) => {
callback!(
@capture<$tcx, $($lft),*> { $($name: $type),* }
@unblock = |$this| $unblock
@timeout = |_this| {
unreachable!(
"timeout on a thread that was blocked without a timeout (or someone forgot to overwrite this method)"
)
}
)
};
(
@capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? }
@unblock = |$this:ident| $unblock:block
@timeout = |$this_timeout:ident| $timeout:block
) => {{
(@capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? }
@unblock = |$this:ident, $result:ident| $complete:expr $(,)?)
=> {{

struct Callback<$tcx, $($lft),*> {
$($name: $type,)*
_phantom: std::marker::PhantomData<&$tcx ()>,
}

impl<$tcx, $($lft),*> VisitProvenance for Callback<$tcx, $($lft),*> {
#[allow(unused_variables)]
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
$(
self.$name.visit_provenance(visit);
self.$name.visit_provenance(_visit);
)*
}
}

impl<$tcx, $($lft),*> UnblockCallback<$tcx> for Callback<$tcx, $($lft),*> {
fn unblock(self: Box<Self>, $this: &mut MiriInterpCx<$tcx>) -> InterpResult<$tcx> {
#[allow(unused_variables)]
let Callback { $($name,)* _phantom } = *self;
$unblock
}

fn timeout(self: Box<Self>, $this_timeout: &mut MiriInterpCx<$tcx>) -> InterpResult<$tcx> {
impl<$tcx, $($lft),*> MachineCallback<$tcx> for Callback<$tcx, $($lft),*> {
fn call(
self: Box<Self>,
$this: &mut MiriInterpCx<$tcx>,
$result: MachineCallbackState
) -> InterpResult<$tcx> {
#[allow(unused_variables)]
let Callback { $($name,)* _phantom } = *self;
$timeout
$complete
}
}

Box::new(Callback { $($name,)* _phantom: std::marker::PhantomData })
}}
}};
}

/// A thread identifier.
Expand Down Expand Up @@ -168,7 +160,7 @@ enum ThreadState<'tcx> {
/// The thread is enabled and can be executed.
Enabled,
/// The thread is blocked on something.
Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DyMachineCallback<'tcx> },
/// The thread has terminated its execution. We do not delete terminated
/// threads (FIXME: why?).
Terminated,
Expand Down Expand Up @@ -656,11 +648,18 @@ impl<'tcx> ThreadManager<'tcx> {
@capture<'tcx> {
joined_thread_id: ThreadId,
}
@unblock = |this| {
if let Some(data_race) = &mut this.machine.data_race {
data_race.thread_joined(&this.machine.threads, joined_thread_id);
@unblock = |this, tcb_state| {
match tcb_state {
MachineCallbackState::Ready => {
if let Some(data_race) = &mut this.machine.data_race {
data_race.thread_joined(&this.machine.threads, joined_thread_id);
}
interp_ok(())
}
MachineCallbackState::TimedOut => {
panic!("Join thread operation received unexpected timeout state - operations do not support timeouts")
},
}
interp_ok(())
}
),
);
Expand Down Expand Up @@ -718,7 +717,7 @@ impl<'tcx> ThreadManager<'tcx> {
&mut self,
reason: BlockReason,
timeout: Option<Timeout>,
callback: DynUnblockCallback<'tcx>,
callback: DyMachineCallback<'tcx>,
) {
let state = &mut self.threads[self.active_thread].state;
assert!(state.is_enabled());
Expand Down Expand Up @@ -842,7 +841,7 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
// 2. Make the scheduler the only place that can change the active
// thread.
let old_thread = this.machine.threads.set_active_thread_id(thread);
callback.timeout(this)?;
callback.call(this, MachineCallbackState::TimedOut)?;
this.machine.threads.set_active_thread_id(old_thread);
}
// found_callback can remain None if the computer's clock
Expand Down Expand Up @@ -1040,7 +1039,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
&mut self,
reason: BlockReason,
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
callback: DynUnblockCallback<'tcx>,
callback: DyMachineCallback<'tcx>,
) {
let this = self.eval_context_mut();
let timeout = timeout.map(|(clock, anchor, duration)| {
Expand Down Expand Up @@ -1084,7 +1083,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
};
// The callback must be executed in the previously blocked thread.
let old_thread = this.machine.threads.set_active_thread_id(thread);
callback.unblock(this)?;
callback.call(this, MachineCallbackState::Ready)?;
this.machine.threads.set_active_thread_id(old_thread);
interp_ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ pub use crate::concurrency::sync::{
CondvarId, EvalContextExt as _, MutexRef, RwLockId, SynchronizationObjects,
};
pub use crate::concurrency::thread::{
BlockReason, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, TimeoutAnchor,
TimeoutClock, UnblockCallback,
BlockReason, EvalContextExt as _, MachineCallback, MachineCallbackState, StackEmptyCallback,
ThreadId, ThreadManager, TimeoutAnchor, TimeoutClock,
};
pub use crate::diagnostics::{
EvalContextExt as _, NonHaltingDiagnostic, TerminationInfo, report_error,
Expand Down
Loading

0 comments on commit 81afa3e

Please sign in to comment.