Skip to content

Commit

Permalink
concurrency: Generalize UnblockCallback to MachineCallback
Browse files Browse the repository at this point in the history
    * Introduce UnblockKind enum to represent operation outcomes
    * Consolidate unblock/timeout methods into single callback interface
    * Update thread blocking system to use new callback mechanism
    * Refactor mutex and condvar implementations for new callback pattern

Signed-off-by: shamb0 <r.raajey@gmail.com>
  • Loading branch information
shamb0 committed Dec 27, 2024
1 parent 7049aa6 commit 4c67220
Show file tree
Hide file tree
Showing 9 changed files with 1,217 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/concurrency/init_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::VecDeque;

use rustc_index::Idx;

use super::thread::DynUnblockCallback;
use super::thread::DyMachineCallback;
use super::vector_clock::VClock;
use crate::*;

Expand Down Expand Up @@ -35,7 +35,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {

/// Put the thread into the queue waiting for the initialization.
#[inline]
fn init_once_enqueue_and_block(&mut self, id: InitOnceId, callback: DynUnblockCallback<'tcx>) {
fn init_once_enqueue_and_block(&mut self, id: InitOnceId, callback: DyMachineCallback<'tcx>) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let init_once = &mut this.machine.sync.init_onces[id];
Expand Down
13 changes: 9 additions & 4 deletions src/concurrency/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
assert!(!this.mutex_is_locked(&mutex_ref));
this.mutex_lock(&mutex_ref);

if let Some((retval, dest)) = retval_dest {
this.write_scalar(retval, &dest)?;
}
if let Some((retval, dest)) = retval_dest {
this.write_scalar(retval, &dest)?;
}

interp_ok(())
interp_ok(())
},
MachineCallbackState::TimedOut => {
panic!("Mutex operation received unexpected timeout state - mutex operations do not support timeouts")
},
}
}
),
);
Expand Down
7 changes: 3 additions & 4 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ enum ThreadState<'tcx> {
/// The thread is enabled and can be executed.
Enabled,
/// The thread is blocked on something.
Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DyMachineCallback<'tcx> },
/// The thread has terminated its execution. We do not delete terminated
/// threads (FIXME: why?).
Terminated,
Expand Down Expand Up @@ -610,7 +610,6 @@ impl<'tcx> ThreadManager<'tcx> {
if let Some(data_race) = &mut this.machine.data_race {
data_race.thread_joined(&this.machine.threads, joined_thread_id);
}
interp_ok(())
}
),
);
Expand Down Expand Up @@ -668,7 +667,7 @@ impl<'tcx> ThreadManager<'tcx> {
&mut self,
reason: BlockReason,
timeout: Option<Timeout>,
callback: DynUnblockCallback<'tcx>,
callback: DyMachineCallback<'tcx>,
) {
let state = &mut self.threads[self.active_thread].state;
assert!(state.is_enabled());
Expand Down Expand Up @@ -990,7 +989,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
&mut self,
reason: BlockReason,
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
callback: DynUnblockCallback<'tcx>,
callback: DyMachineCallback<'tcx>,
) {
let this = self.eval_context_mut();
let timeout = timeout.map(|(clock, anchor, duration)| {
Expand Down
177 changes: 177 additions & 0 deletions src/shims/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub trait FileDescription: std::fmt::Debug + Any {
/// Reads as much as possible into the given buffer `ptr`.
/// `len` indicates how many bytes we should try to read.
/// `dest` is where the return value should be stored: number of bytes read, or `-1` in case of error.
#[allow(dead_code)]
fn read<'tcx>(
&self,
_self_ref: &FileDescriptionRef,
Expand All @@ -29,6 +30,29 @@ pub trait FileDescription: std::fmt::Debug + Any {
throw_unsup_format!("cannot read from {}", self.name());
}

/// Performs an atomic read operation on the file.
///
/// # Arguments
/// * `self_ref` - Strong reference to file description for lifetime management
/// * `communicate_allowed` - Whether external communication is permitted
/// * `op` - The I/O operation containing buffer and layout information
/// * `dest` - Destination for storing operation results
/// * `ecx` - Mutable reference to interpreter context
///
/// # Returns
/// * `Ok(())` on successful read
/// * `Err(_)` if read fails or is unsupported
fn read_atomic<'tcx>(
&self,
_self_ref: &FileDescriptionRef,
_communicate_allowed: bool,
_op: &mut IoTransferOperation<'tcx>,
_dest: &MPlaceTy<'tcx>,
_ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
throw_unsup_format!("cannot read from {}", self.name());
}

/// Writes as much as possible from the given buffer `ptr`.
/// `len` indicates how many bytes we should try to write.
/// `dest` is where the return value should be stored: number of bytes written, or `-1` in case of error.
Expand Down Expand Up @@ -409,3 +433,156 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
interp_ok(())
}
}

/// Represents an atomic I/O operation that handles data transfer between memory regions.
/// Supports both contiguous and scattered memory layouts for efficient I/O operations.
#[derive(Clone)]
pub struct IoTransferOperation<'tcx> {
/// Intermediate buffer for atomic transfer operations.
/// For reads: Temporary storage before distribution to destinations
/// For writes: Aggregation point before writing to file
transfer_buffer: Vec<u8>,

/// Memory layout specification for the transfer operation.
layout: IoBufferLayout,

/// Total number of bytes to be processed in this operation.
total_size: usize,

/// Interpreter context lifetime marker.
_phantom: std::marker::PhantomData<&'tcx ()>,
}

/// Specifies how memory regions are organized for I/O operations
#[derive(Clone)]
enum IoBufferLayout {
/// Single continuous memory region for transfer.
Contiguous { address: Pointer },
/// Multiple discontinuous memory regions.
Scattered { regions: Vec<(Pointer, usize)> },
}

impl VisitProvenance for IoTransferOperation<'_> {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
// Visits any references that need provenance tracking.
// Currently a no-op as IoTransferOperation contains no such references.
}
}

impl<'tcx> IoTransferOperation<'tcx> {
/// Creates a new I/O operation for a contiguous memory region.
pub fn new_contiguous(ptr: Pointer, len: usize) -> Self {
IoTransferOperation {
transfer_buffer: vec![0; len],
layout: IoBufferLayout::Contiguous { address: ptr },
total_size: len,
_phantom: std::marker::PhantomData,
}
}

/// Creates a new I/O operation for scattered memory regions.
pub fn new_scattered(buffers: Vec<(Pointer, usize)>) -> Self {
let total_size = buffers.iter().map(|(_, len)| len).sum();
IoTransferOperation {
transfer_buffer: vec![0; total_size],
layout: IoBufferLayout::Scattered { regions: buffers },
total_size,
_phantom: std::marker::PhantomData,
}
}

/// Provides mutable access to the transfer buffer.
pub fn buffer_mut(&mut self) -> &mut [u8] {
&mut self.transfer_buffer
}

/// Distributes data from the transfer buffer to final destinations.
pub fn distribute_data(
&mut self,
ecx: &mut MiriInterpCx<'tcx>,
dest: &MPlaceTy<'tcx>,
bytes_processed: usize,
) -> InterpResult<'tcx> {
if bytes_processed > self.total_size {
return ecx.set_last_error_and_return(LibcError("EINVAL"), dest);
}

match &self.layout {
IoBufferLayout::Contiguous { address } => {
// POSIX Compliance: Verify buffer accessibility before writing
if ecx
.check_ptr_access(
*address,
Size::from_bytes(bytes_processed),
CheckInAllocMsg::MemoryAccessTest,
)
.report_err()
.is_err()
{
return ecx.set_last_error_and_return(LibcError("EFAULT"), dest);
}

// Attempt the write operation
if ecx
.write_bytes_ptr(
*address,
self.transfer_buffer[..bytes_processed].iter().copied(),
)
.report_err()
.is_err()
{
return ecx.set_last_error_and_return(LibcError("EIO"), dest);
}
}

IoBufferLayout::Scattered { regions } => {
let mut current_pos = 0;

for (ptr, len) in regions {
if current_pos >= bytes_processed {
break;
}

// Calculate copy size with safe arithmetic
let remaining_bytes = bytes_processed
.checked_sub(current_pos)
.expect("current_pos should never exceed bytes_read");
let copy_size = (*len).min(remaining_bytes);

// POSIX Compliance: Verify each buffer's accessibility
if ecx
.check_ptr_access(
*ptr,
Size::from_bytes(copy_size),
CheckInAllocMsg::MemoryAccessTest,
)
.report_err()
.is_err()
{
return ecx.set_last_error_and_return(LibcError("EFAULT"), dest);
}

let end_pos = current_pos
.checked_add(copy_size)
.expect("end position calculation should not overflow");

// Attempt the write operation with proper error handling
if ecx
.write_bytes_ptr(
*ptr,
self.transfer_buffer[current_pos..end_pos].iter().copied(),
)
.report_err()
.is_err()
{
return ecx.set_last_error_and_return(LibcError("EIO"), dest);
}

current_pos = end_pos;
}
}
}

interp_ok(())
}
}
Loading

0 comments on commit 4c67220

Please sign in to comment.