Skip to content

Commit

Permalink
Add callback support to FileDescription
Browse files Browse the repository at this point in the history
   - Implementing atomic reads for contiguous buffers

Signed-off-by: shamb0 <r.raajey@gmail.com>
  • Loading branch information
shamb0 committed Dec 27, 2024
1 parent 4c67220 commit 59f70e8
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 1,006 deletions.
4 changes: 2 additions & 2 deletions src/concurrency/init_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::VecDeque;

use rustc_index::Idx;

use super::thread::DyMachineCallback;
use super::thread::DynUnblockCallback;
use super::vector_clock::VClock;
use crate::*;

Expand Down Expand Up @@ -35,7 +35,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {

/// Put the thread into the queue waiting for the initialization.
#[inline]
fn init_once_enqueue_and_block(&mut self, id: InitOnceId, callback: DyMachineCallback<'tcx>) {
fn init_once_enqueue_and_block(&mut self, id: InitOnceId, callback: DynUnblockCallback<'tcx>) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let init_once = &mut this.machine.sync.init_onces[id];
Expand Down
13 changes: 4 additions & 9 deletions src/concurrency/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,16 +428,11 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
assert!(!this.mutex_is_locked(&mutex_ref));
this.mutex_lock(&mutex_ref);

if let Some((retval, dest)) = retval_dest {
this.write_scalar(retval, &dest)?;
}

interp_ok(())
},
MachineCallbackState::TimedOut => {
panic!("Mutex operation received unexpected timeout state - mutex operations do not support timeouts")
},
if let Some((retval, dest)) = retval_dest {
this.write_scalar(retval, &dest)?;
}

interp_ok(())
}
),
);
Expand Down
7 changes: 4 additions & 3 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ enum ThreadState<'tcx> {
/// The thread is enabled and can be executed.
Enabled,
/// The thread is blocked on something.
Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DyMachineCallback<'tcx> },
Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
/// The thread has terminated its execution. We do not delete terminated
/// threads (FIXME: why?).
Terminated,
Expand Down Expand Up @@ -610,6 +610,7 @@ impl<'tcx> ThreadManager<'tcx> {
if let Some(data_race) = &mut this.machine.data_race {
data_race.thread_joined(&this.machine.threads, joined_thread_id);
}
interp_ok(())
}
),
);
Expand Down Expand Up @@ -667,7 +668,7 @@ impl<'tcx> ThreadManager<'tcx> {
&mut self,
reason: BlockReason,
timeout: Option<Timeout>,
callback: DyMachineCallback<'tcx>,
callback: DynUnblockCallback<'tcx>,
) {
let state = &mut self.threads[self.active_thread].state;
assert!(state.is_enabled());
Expand Down Expand Up @@ -989,7 +990,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
&mut self,
reason: BlockReason,
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
callback: DyMachineCallback<'tcx>,
callback: DynUnblockCallback<'tcx>,
) {
let this = self.eval_context_mut();
let timeout = timeout.map(|(clock, anchor, duration)| {
Expand Down
85 changes: 1 addition & 84 deletions src/shims/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,6 @@ pub trait FileDescription: std::fmt::Debug + Any {
throw_unsup_format!("cannot read from {}", self.name());
}

/// Performs an atomic read operation on the file.
///
/// # Arguments
/// * `self_ref` - Strong reference to file description for lifetime management
/// * `communicate_allowed` - Whether external communication is permitted
/// * `op` - The I/O operation containing buffer and layout information
/// * `dest` - Destination for storing operation results
/// * `ecx` - Mutable reference to interpreter context
///
/// # Returns
/// * `Ok(())` on successful read
/// * `Err(_)` if read fails or is unsupported
fn read_atomic<'tcx>(
&self,
_self_ref: &FileDescriptionRef,
_communicate_allowed: bool,
_op: &mut IoTransferOperation<'tcx>,
_dest: &MPlaceTy<'tcx>,
_ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
throw_unsup_format!("cannot read from {}", self.name());
}

/// Writes as much as possible from the given buffer `ptr`.
/// `len` indicates how many bytes we should try to write.
/// `dest` is where the return value should be stored: number of bytes written, or `-1` in case of error.
Expand Down Expand Up @@ -435,7 +412,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
}

/// Represents an atomic I/O operation that handles data transfer between memory regions.
/// Supports both contiguous and scattered memory layouts for efficient I/O operations.
/// Supports contiguous memory layouts for efficient I/O operations.
#[derive(Clone)]
pub struct IoTransferOperation<'tcx> {
/// Intermediate buffer for atomic transfer operations.
Expand All @@ -458,8 +435,6 @@ pub struct IoTransferOperation<'tcx> {
enum IoBufferLayout {
/// Single continuous memory region for transfer.
Contiguous { address: Pointer },
/// Multiple discontinuous memory regions.
Scattered { regions: Vec<(Pointer, usize)> },
}

impl VisitProvenance for IoTransferOperation<'_> {
Expand All @@ -480,17 +455,6 @@ impl<'tcx> IoTransferOperation<'tcx> {
}
}

/// Creates a new I/O operation for scattered memory regions.
pub fn new_scattered(buffers: Vec<(Pointer, usize)>) -> Self {
let total_size = buffers.iter().map(|(_, len)| len).sum();
IoTransferOperation {
transfer_buffer: vec![0; total_size],
layout: IoBufferLayout::Scattered { regions: buffers },
total_size,
_phantom: std::marker::PhantomData,
}
}

/// Provides mutable access to the transfer buffer.
pub fn buffer_mut(&mut self) -> &mut [u8] {
&mut self.transfer_buffer
Expand Down Expand Up @@ -534,53 +498,6 @@ impl<'tcx> IoTransferOperation<'tcx> {
return ecx.set_last_error_and_return(LibcError("EIO"), dest);
}
}

IoBufferLayout::Scattered { regions } => {
let mut current_pos = 0;

for (ptr, len) in regions {
if current_pos >= bytes_processed {
break;
}

// Calculate copy size with safe arithmetic
let remaining_bytes = bytes_processed
.checked_sub(current_pos)
.expect("current_pos should never exceed bytes_read");
let copy_size = (*len).min(remaining_bytes);

// POSIX Compliance: Verify each buffer's accessibility
if ecx
.check_ptr_access(
*ptr,
Size::from_bytes(copy_size),
CheckInAllocMsg::MemoryAccessTest,
)
.report_err()
.is_err()
{
return ecx.set_last_error_and_return(LibcError("EFAULT"), dest);
}

let end_pos = current_pos
.checked_add(copy_size)
.expect("end position calculation should not overflow");

// Attempt the write operation with proper error handling
if ecx
.write_bytes_ptr(
*ptr,
self.transfer_buffer[current_pos..end_pos].iter().copied(),
)
.report_err()
.is_err()
{
return ecx.set_last_error_and_return(LibcError("EIO"), dest);
}

current_pos = end_pos;
}
}
}

interp_ok(())
Expand Down
Loading

0 comments on commit 59f70e8

Please sign in to comment.