diff --git a/src/concurrency/init_once.rs b/src/concurrency/init_once.rs index 534f02545b..a2960cb6fb 100644 --- a/src/concurrency/init_once.rs +++ b/src/concurrency/init_once.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use rustc_index::Idx; -use super::thread::DynUnblockCallback; +use super::thread::DyMachineCallback; use super::vector_clock::VClock; use crate::*; @@ -35,7 +35,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { /// Put the thread into the queue waiting for the initialization. #[inline] - fn init_once_enqueue_and_block(&mut self, id: InitOnceId, callback: DynUnblockCallback<'tcx>) { + fn init_once_enqueue_and_block(&mut self, id: InitOnceId, callback: DyMachineCallback<'tcx>) { let this = self.eval_context_mut(); let thread = this.active_thread(); let init_once = &mut this.machine.sync.init_onces[id]; diff --git a/src/concurrency/sync.rs b/src/concurrency/sync.rs index ef4034cc0c..79dd82b475 100644 --- a/src/concurrency/sync.rs +++ b/src/concurrency/sync.rs @@ -422,15 +422,22 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { mutex_ref: MutexRef, retval_dest: Option<(Scalar, MPlaceTy<'tcx>)>, } - @unblock = |this| { - assert!(!this.mutex_is_locked(&mutex_ref)); - this.mutex_lock(&mutex_ref); - - if let Some((retval, dest)) = retval_dest { - this.write_scalar(retval, &dest)?; + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + assert!(!this.mutex_is_locked(&mutex_ref)); + this.mutex_lock(&mutex_ref); + + if let Some((retval, dest)) = retval_dest { + this.write_scalar(retval, &dest)?; + } + + interp_ok(()) + }, + MachineCallbackState::TimedOut => { + panic!("Mutex operation received unexpected timeout state - mutex operations do not support timeouts") + }, } - - interp_ok(()) } ), ); @@ -538,10 +545,17 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { retval: Scalar, dest: MPlaceTy<'tcx>, } - @unblock = |this| { - this.rwlock_reader_lock(id); - this.write_scalar(retval, &dest)?; - interp_ok(()) + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + this.rwlock_reader_lock(id); + this.write_scalar(retval, &dest)?; + interp_ok(()) + }, + MachineCallbackState::TimedOut => { + panic!("RwLock operation received unexpected timeout state - RwLock operations do not support timeouts") + }, + } } ), ); @@ -623,10 +637,17 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { retval: Scalar, dest: MPlaceTy<'tcx>, } - @unblock = |this| { - this.rwlock_writer_lock(id); - this.write_scalar(retval, &dest)?; - interp_ok(()) + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + this.rwlock_writer_lock(id); + this.write_scalar(retval, &dest)?; + interp_ok(()) + }, + MachineCallbackState::TimedOut => { + panic!("RwLock operation received unexpected timeout state - RwLock operations do not support timeouts") + }, + } } ), ); @@ -677,25 +698,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { retval_timeout: Scalar, dest: MPlaceTy<'tcx>, } - @unblock = |this| { - // The condvar was signaled. Make sure we get the clock for that. - if let Some(data_race) = &this.machine.data_race { - data_race.acquire_clock( - &this.machine.sync.condvars[condvar].clock, - &this.machine.threads, - ); + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + // The condvar was signaled. Make sure we get the clock for that. + if let Some(data_race) = &this.machine.data_race { + data_race.acquire_clock( + &this.machine.sync.condvars[condvar].clock, + &this.machine.threads, + ); + } + // Try to acquire the mutex. + // The timeout only applies to the first wait (until the signal), not for mutex acquisition. + this.condvar_reacquire_mutex(&mutex_ref, retval_succ, dest) + } + MachineCallbackState::TimedOut => { + // We have to remove the waiter from the queue again. + let thread = this.active_thread(); + let waiters = &mut this.machine.sync.condvars[condvar].waiters; + waiters.retain(|waiter| *waiter != thread); + // Now get back the lock. + this.condvar_reacquire_mutex(&mutex_ref, retval_timeout, dest) + } } - // Try to acquire the mutex. - // The timeout only applies to the first wait (until the signal), not for mutex acquisition. - this.condvar_reacquire_mutex(&mutex_ref, retval_succ, dest) - } - @timeout = |this| { - // We have to remove the waiter from the queue again. - let thread = this.active_thread(); - let waiters = &mut this.machine.sync.condvars[condvar].waiters; - waiters.retain(|waiter| *waiter != thread); - // Now get back the lock. - this.condvar_reacquire_mutex(&mutex_ref, retval_timeout, dest) } ), ); @@ -752,25 +777,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { dest: MPlaceTy<'tcx>, errno_timeout: IoError, } - @unblock = |this| { - let futex = futex_ref.0.borrow(); - // Acquire the clock of the futex. - if let Some(data_race) = &this.machine.data_race { - data_race.acquire_clock(&futex.clock, &this.machine.threads); + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + let futex = futex_ref.0.borrow(); + // Acquire the clock of the futex. + if let Some(data_race) = &this.machine.data_race { + data_race.acquire_clock(&futex.clock, &this.machine.threads); + } + // Write the return value. + this.write_scalar(retval_succ, &dest)?; + interp_ok(()) + }, + MachineCallbackState::TimedOut => { + // Remove the waiter from the futex. + let thread = this.active_thread(); + let mut futex = futex_ref.0.borrow_mut(); + futex.waiters.retain(|waiter| waiter.thread != thread); + // Set errno and write return value. + this.set_last_error(errno_timeout)?; + this.write_scalar(retval_timeout, &dest)?; + interp_ok(()) + }, } - // Write the return value. - this.write_scalar(retval_succ, &dest)?; - interp_ok(()) - } - @timeout = |this| { - // Remove the waiter from the futex. - let thread = this.active_thread(); - let mut futex = futex_ref.0.borrow_mut(); - futex.waiters.retain(|waiter| waiter.thread != thread); - // Set errno and write return value. - this.set_last_error(errno_timeout)?; - this.write_scalar(retval_timeout, &dest)?; - interp_ok(()) } ), ); diff --git a/src/concurrency/thread.rs b/src/concurrency/thread.rs index 730c27d016..6d57cb3876 100644 --- a/src/concurrency/thread.rs +++ b/src/concurrency/thread.rs @@ -38,70 +38,62 @@ pub enum TlsAllocAction { Leak, } -/// Trait for callbacks that are executed when a thread gets unblocked. -pub trait UnblockCallback<'tcx>: VisitProvenance { - /// Will be invoked when the thread was unblocked the "regular" way, - /// i.e. whatever event it was blocking on has happened. - fn unblock(self: Box, ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>) -> InterpResult<'tcx>; - - /// Will be invoked when the timeout ellapsed without the event the - /// thread was blocking on having occurred. - fn timeout(self: Box, _ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>) - -> InterpResult<'tcx>; +/// The completion state of an asynchronous machine operation in Miri's concurrency system. +/// Used by callbacks to determine how a blocked thread should proceed after being woken. +#[derive(Debug)] +pub enum MachineCallbackState { + /// The operation completed successfully and the thread can proceed with its normal execution. + Ready, + /// The operation did not complete within its specified duration and should handle the timeout case. + TimedOut, +} + +/// Generic callback trait for machine operations. +pub trait MachineCallback<'tcx>: VisitProvenance { + /// Called when the operation completes (successfully or with timeout). + fn call( + self: Box, + ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>, + result: MachineCallbackState, + ) -> InterpResult<'tcx>; } -pub type DynUnblockCallback<'tcx> = Box + 'tcx>; +pub type DyMachineCallback<'tcx> = Box + 'tcx>; + +/// Macro for creating machine callbacks #[macro_export] macro_rules! callback { - ( - @capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? } - @unblock = |$this:ident| $unblock:block - ) => { - callback!( - @capture<$tcx, $($lft),*> { $($name: $type),* } - @unblock = |$this| $unblock - @timeout = |_this| { - unreachable!( - "timeout on a thread that was blocked without a timeout (or someone forgot to overwrite this method)" - ) - } - ) - }; - ( - @capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? } - @unblock = |$this:ident| $unblock:block - @timeout = |$this_timeout:ident| $timeout:block - ) => {{ + (@capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? } + @unblock = |$this:ident, $result:ident| $complete:expr $(,)?) + => {{ + struct Callback<$tcx, $($lft),*> { $($name: $type,)* _phantom: std::marker::PhantomData<&$tcx ()>, } impl<$tcx, $($lft),*> VisitProvenance for Callback<$tcx, $($lft),*> { - #[allow(unused_variables)] - fn visit_provenance(&self, visit: &mut VisitWith<'_>) { + fn visit_provenance(&self, _visit: &mut VisitWith<'_>) { $( - self.$name.visit_provenance(visit); + self.$name.visit_provenance(_visit); )* } } - impl<$tcx, $($lft),*> UnblockCallback<$tcx> for Callback<$tcx, $($lft),*> { - fn unblock(self: Box, $this: &mut MiriInterpCx<$tcx>) -> InterpResult<$tcx> { - #[allow(unused_variables)] - let Callback { $($name,)* _phantom } = *self; - $unblock - } - - fn timeout(self: Box, $this_timeout: &mut MiriInterpCx<$tcx>) -> InterpResult<$tcx> { + impl<$tcx, $($lft),*> MachineCallback<$tcx> for Callback<$tcx, $($lft),*> { + fn call( + self: Box, + $this: &mut MiriInterpCx<$tcx>, + $result: MachineCallbackState + ) -> InterpResult<$tcx> { #[allow(unused_variables)] let Callback { $($name,)* _phantom } = *self; - $timeout + $complete } } Box::new(Callback { $($name,)* _phantom: std::marker::PhantomData }) - }} + }}; } /// A thread identifier. @@ -168,7 +160,7 @@ enum ThreadState<'tcx> { /// The thread is enabled and can be executed. Enabled, /// The thread is blocked on something. - Blocked { reason: BlockReason, timeout: Option, callback: DynUnblockCallback<'tcx> }, + Blocked { reason: BlockReason, timeout: Option, callback: DyMachineCallback<'tcx> }, /// The thread has terminated its execution. We do not delete terminated /// threads (FIXME: why?). Terminated, @@ -656,11 +648,18 @@ impl<'tcx> ThreadManager<'tcx> { @capture<'tcx> { joined_thread_id: ThreadId, } - @unblock = |this| { - if let Some(data_race) = &mut this.machine.data_race { - data_race.thread_joined(&this.machine.threads, joined_thread_id); + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + if let Some(data_race) = &mut this.machine.data_race { + data_race.thread_joined(&this.machine.threads, joined_thread_id); + } + interp_ok(()) + } + MachineCallbackState::TimedOut => { + panic!("Join thread operation received unexpected timeout state - operations do not support timeouts") + }, } - interp_ok(()) } ), ); @@ -718,7 +717,7 @@ impl<'tcx> ThreadManager<'tcx> { &mut self, reason: BlockReason, timeout: Option, - callback: DynUnblockCallback<'tcx>, + callback: DyMachineCallback<'tcx>, ) { let state = &mut self.threads[self.active_thread].state; assert!(state.is_enabled()); @@ -842,7 +841,7 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> { // 2. Make the scheduler the only place that can change the active // thread. let old_thread = this.machine.threads.set_active_thread_id(thread); - callback.timeout(this)?; + callback.call(this, MachineCallbackState::TimedOut)?; this.machine.threads.set_active_thread_id(old_thread); } // found_callback can remain None if the computer's clock @@ -1040,7 +1039,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { &mut self, reason: BlockReason, timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>, - callback: DynUnblockCallback<'tcx>, + callback: DyMachineCallback<'tcx>, ) { let this = self.eval_context_mut(); let timeout = timeout.map(|(clock, anchor, duration)| { @@ -1084,7 +1083,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { }; // The callback must be executed in the previously blocked thread. let old_thread = this.machine.threads.set_active_thread_id(thread); - callback.unblock(this)?; + callback.call(this, MachineCallbackState::Ready)?; this.machine.threads.set_active_thread_id(old_thread); interp_ok(()) } diff --git a/src/lib.rs b/src/lib.rs index e02d51afce..63e2e1b5fb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -126,8 +126,8 @@ pub use crate::concurrency::sync::{ CondvarId, EvalContextExt as _, MutexRef, RwLockId, SynchronizationObjects, }; pub use crate::concurrency::thread::{ - BlockReason, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, TimeoutAnchor, - TimeoutClock, UnblockCallback, + BlockReason, EvalContextExt as _, MachineCallback, MachineCallbackState, StackEmptyCallback, + ThreadId, ThreadManager, TimeoutAnchor, TimeoutClock, }; pub use crate::diagnostics::{ EvalContextExt as _, NonHaltingDiagnostic, TerminationInfo, report_error, diff --git a/src/shims/time.rs b/src/shims/time.rs index 72d98bc1c4..a7ec4d2c2c 100644 --- a/src/shims/time.rs +++ b/src/shims/time.rs @@ -331,8 +331,14 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration)), callback!( @capture<'tcx> {} - @unblock = |_this| { panic!("sleeping thread unblocked before time is up") } - @timeout = |_this| { interp_ok(()) } + @unblock = |_this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + panic!("sleeping thread unblocked before time is up") + }, + MachineCallbackState::TimedOut => { interp_ok(()) }, + } + } ), ); interp_ok(Scalar::from_i32(0)) @@ -353,8 +359,14 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration)), callback!( @capture<'tcx> {} - @unblock = |_this| { panic!("sleeping thread unblocked before time is up") } - @timeout = |_this| { interp_ok(()) } + @unblock = |_this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + panic!("sleeping thread unblocked before time is up") + }, + MachineCallbackState::TimedOut => { interp_ok(()) }, + } + } ), ); interp_ok(()) diff --git a/src/shims/unix/linux_like/epoll.rs b/src/shims/unix/linux_like/epoll.rs index 5b240351c2..d72f823422 100644 --- a/src/shims/unix/linux_like/epoll.rs +++ b/src/shims/unix/linux_like/epoll.rs @@ -496,22 +496,26 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { dest: MPlaceTy<'tcx>, event: MPlaceTy<'tcx>, } - @unblock = |this| { - return_ready_list(epfd_value, weak_epfd, &dest, &event, this)?; - interp_ok(()) - } - @timeout = |this| { - // No notification after blocking timeout. - let Some(epfd) = weak_epfd.upgrade() else { - throw_unsup_format!("epoll FD {epfd_value} got closed while blocking.") - }; - // Remove the current active thread_id from the blocked thread_id list. - epfd.downcast::() - .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))? - .thread_id.borrow_mut() - .retain(|&id| id != this.active_thread()); - this.write_int(0, &dest)?; - interp_ok(()) + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + return_ready_list(epfd_value, weak_epfd, &dest, &event, this)?; + interp_ok(()) + }, + MachineCallbackState::TimedOut => { + // No notification after blocking timeout. + let Some(epfd) = weak_epfd.upgrade() else { + throw_unsup_format!("epoll FD {epfd_value} got closed while blocking.") + }; + // Remove the current active thread_id from the blocked thread_id list. + epfd.downcast::() + .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))? + .thread_id.borrow_mut() + .retain(|&id| id != this.active_thread()); + this.write_int(0, &dest)?; + interp_ok(()) + }, + } } ), ); diff --git a/src/shims/unix/linux_like/eventfd.rs b/src/shims/unix/linux_like/eventfd.rs index 4bbe417ea8..9fd156e116 100644 --- a/src/shims/unix/linux_like/eventfd.rs +++ b/src/shims/unix/linux_like/eventfd.rs @@ -254,9 +254,16 @@ fn eventfd_write<'tcx>( dest: MPlaceTy<'tcx>, weak_eventfd: WeakFileDescriptionRef, } - @unblock = |this| { - // When we get unblocked, try again. - eventfd_write(num, buf_place, &dest, weak_eventfd, this) + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + // When we get unblocked, try again. + eventfd_write(num, buf_place, &dest, weak_eventfd, this) + }, + MachineCallbackState::TimedOut => { + panic!("Eventfd write operation received unexpected timeout state - operations do not support timeouts") + }, + } } ), ); @@ -302,9 +309,16 @@ fn eventfd_read<'tcx>( dest: MPlaceTy<'tcx>, weak_eventfd: WeakFileDescriptionRef, } - @unblock = |this| { - // When we get unblocked, try again. - eventfd_read(buf_place, &dest, weak_eventfd, this) + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + // When we get unblocked, try again. + eventfd_read(buf_place, &dest, weak_eventfd, this) + }, + MachineCallbackState::TimedOut => { + panic!("Eventfd read operation received unexpected timeout state - operations do not support timeouts") + }, + } } ), ); diff --git a/src/shims/unix/macos/sync.rs b/src/shims/unix/macos/sync.rs index f66a57ae70..41401d4413 100644 --- a/src/shims/unix/macos/sync.rs +++ b/src/shims/unix/macos/sync.rs @@ -64,7 +64,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { None, callback!( @capture<'tcx> {} - @unblock = |_this| { + @unblock = |_this, _tcb_state| { panic!("we shouldn't wake up ever") } ), diff --git a/src/shims/unix/unnamed_socket.rs b/src/shims/unix/unnamed_socket.rs index 86ebe95762..4ad5e7eb7c 100644 --- a/src/shims/unix/unnamed_socket.rs +++ b/src/shims/unix/unnamed_socket.rs @@ -199,8 +199,15 @@ fn anonsocket_write<'tcx>( len: usize, dest: MPlaceTy<'tcx>, } - @unblock = |this| { - anonsocket_write(weak_self_ref, ptr, len, dest, this) + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + anonsocket_write(weak_self_ref, ptr, len, dest, this) + }, + MachineCallbackState::TimedOut => { + panic!("Unnamed socket write operation received unexpected timeout state - operations do not support timeouts") + }, + } } ), ); @@ -273,8 +280,15 @@ fn anonsocket_read<'tcx>( ptr: Pointer, dest: MPlaceTy<'tcx>, } - @unblock = |this| { - anonsocket_read(weak_self_ref, len, ptr, dest, this) + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + anonsocket_read(weak_self_ref, len, ptr, dest, this) + }, + MachineCallbackState::TimedOut => { + panic!("Unnamed socket read operation received unexpected timeout state - operations do not support timeouts") + }, + } } ), ); diff --git a/src/shims/windows/sync.rs b/src/shims/windows/sync.rs index a394e0430b..713bd3490f 100644 --- a/src/shims/windows/sync.rs +++ b/src/shims/windows/sync.rs @@ -111,10 +111,17 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { pending_place: MPlaceTy<'tcx>, dest: MPlaceTy<'tcx>, } - @unblock = |this| { - let ret = this.init_once_try_begin(id, &pending_place, &dest)?; - assert!(ret, "we were woken up but init_once_try_begin still failed"); - interp_ok(()) + @unblock = |this, tcb_state| { + match tcb_state { + MachineCallbackState::Ready => { + let ret = this.init_once_try_begin(id, &pending_place, &dest)?; + assert!(ret, "we were woken up but init_once_try_begin still failed"); + interp_ok(()) + }, + MachineCallbackState::TimedOut => { + panic!("Windows sync init received unexpected timeout state - operations do not support timeouts") + }, + } } ), );