From e6fb89162e797ef3b5b31b13f766fb04fcfdd53b Mon Sep 17 00:00:00 2001 From: Ralf Jung Date: Fri, 27 Dec 2024 11:37:11 +0100 Subject: [PATCH 1/3] bring socket logic back together and fix logic bug --- src/shims/unix/unnamed_socket.rs | 160 ++++++++++++++----------------- 1 file changed, 74 insertions(+), 86 deletions(-) diff --git a/src/shims/unix/unnamed_socket.rs b/src/shims/unix/unnamed_socket.rs index 86ebe95762..7f35838e9d 100644 --- a/src/shims/unix/unnamed_socket.rs +++ b/src/shims/unix/unnamed_socket.rs @@ -96,26 +96,7 @@ impl FileDescription for AnonSocket { dest: &MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - // Always succeed on read size 0. - if len == 0 { - return ecx.return_read_success(ptr, &[], 0, dest); - } - - let Some(readbuf) = &self.readbuf else { - // FIXME: This should return EBADF, but there's no nice way to do that as there's no - // corresponding ErrorKind variant. - throw_unsup_format!("reading from the write end of a pipe"); - }; - - if readbuf.borrow().buf.is_empty() && self.is_nonblock { - // Non-blocking socketpair with writer and empty buffer. - // https://linux.die.net/man/2/read - // EAGAIN or EWOULDBLOCK can be returned for socket, - // POSIX.1-2001 allows either error to be returned for this case. - // Since there is no ErrorKind for EAGAIN, WouldBlock is used. - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } - anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx) + anonsocket_read(self_ref, len, ptr, dest, ecx) } fn write<'tcx>( @@ -127,31 +108,7 @@ impl FileDescription for AnonSocket { dest: &MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - // Always succeed on write size 0. - // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.") - if len == 0 { - return ecx.return_write_success(0, dest); - } - - // We are writing to our peer's readbuf. - let Some(peer_fd) = self.peer_fd().upgrade() else { - // If the upgrade from Weak to Rc fails, it indicates that all read ends have been - // closed. - return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest); - }; - - let Some(writebuf) = &peer_fd.downcast::().unwrap().readbuf else { - // FIXME: This should return EBADF, but there's no nice way to do that as there's no - // corresponding ErrorKind variant. - throw_unsup_format!("writing to the reading end of a pipe"); - }; - let available_space = - MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len()); - if available_space == 0 && self.is_nonblock { - // Non-blocking socketpair with a full buffer. - return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); - } - anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx) + anonsocket_write(self_ref, ptr, len, dest, ecx) } fn as_unix(&self) -> &dyn UnixFileDescription { @@ -161,50 +118,66 @@ impl FileDescription for AnonSocket { /// Write to AnonSocket based on the space available and return the written byte size. fn anonsocket_write<'tcx>( - weak_self_ref: WeakFileDescriptionRef, + self_ref: &FileDescriptionRef, ptr: Pointer, len: usize, - dest: MPlaceTy<'tcx>, + dest: &MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - let Some(self_ref) = weak_self_ref.upgrade() else { - // FIXME: We should raise a deadlock error if the self_ref upgrade failed. - throw_unsup_format!("This will be a deadlock error in future") - }; let self_anonsocket = self_ref.downcast::().unwrap(); + + // Always succeed on write size 0. + // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.") + if len == 0 { + return ecx.return_write_success(0, dest); + } + + // We are writing to our peer's readbuf. let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else { // If the upgrade from Weak to Rc fails, it indicates that all read ends have been - // closed. - return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest); + // closed. It is an error to write even if there would be space. + return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest); }; + let Some(writebuf) = &peer_fd.downcast::().unwrap().readbuf else { - // FIXME: This should return EBADF, but there's no nice way to do that as there's no - // corresponding ErrorKind variant. - throw_unsup_format!("writing to the reading end of a pipe") + // Writing to the read end of a pipe. + return ecx.set_last_error_and_return(IoError::LibcError("EBADF"), dest); }; + // Let's see if we can write. let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len()); - if available_space == 0 { - // Blocking socketpair with a full buffer. - let dest = dest.clone(); - self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread()); - ecx.block_thread( - BlockReason::UnnamedSocket, - None, - callback!( - @capture<'tcx> { - weak_self_ref: WeakFileDescriptionRef, - ptr: Pointer, - len: usize, - dest: MPlaceTy<'tcx>, - } - @unblock = |this| { - anonsocket_write(weak_self_ref, ptr, len, dest, this) - } - ), - ); + if self_anonsocket.is_nonblock { + // Non-blocking socketpair with a full buffer. + return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); + } else { + // Blocking socketpair with a full buffer. + // Block the current thread; only keep a weak ref for this. + let weak_self_ref = self_ref.downgrade(); + let dest = dest.clone(); + self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread()); + ecx.block_thread( + BlockReason::UnnamedSocket, + None, + callback!( + @capture<'tcx> { + weak_self_ref: WeakFileDescriptionRef, + ptr: Pointer, + len: usize, + dest: MPlaceTy<'tcx>, + } + @unblock = |this| { + let Some(self_ref) = weak_self_ref.upgrade() else { + // FIXME: We should raise a deadlock error if the self_ref upgrade failed. + throw_unsup_format!("This will be a deadlock error in future") + }; + anonsocket_write(&self_ref, ptr, len, &dest, this) + } + ), + ); + } } else { + // There is space to write! let mut writebuf = writebuf.borrow_mut(); // Remember this clock so `read` can synchronize with us. ecx.release_clock(|clock| { @@ -229,25 +202,26 @@ fn anonsocket_write<'tcx>( ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?; } - return ecx.return_write_success(actual_write_size, &dest); + return ecx.return_write_success(actual_write_size, dest); } interp_ok(()) } /// Read from AnonSocket and return the number of bytes read. fn anonsocket_read<'tcx>( - weak_self_ref: WeakFileDescriptionRef, + self_ref: &FileDescriptionRef, len: usize, ptr: Pointer, - dest: MPlaceTy<'tcx>, + dest: &MPlaceTy<'tcx>, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - let Some(self_ref) = weak_self_ref.upgrade() else { - // FIXME: We should raise a deadlock error if the self_ref upgrade failed. - throw_unsup_format!("This will be a deadlock error in future") - }; let self_anonsocket = self_ref.downcast::().unwrap(); + // Always succeed on read size 0. + if len == 0 { + return ecx.return_read_success(ptr, &[], 0, dest); + } + let Some(readbuf) = &self_anonsocket.readbuf else { // FIXME: This should return EBADF, but there's no nice way to do that as there's no // corresponding ErrorKind variant. @@ -258,10 +232,19 @@ fn anonsocket_read<'tcx>( if self_anonsocket.peer_fd().upgrade().is_none() { // Socketpair with no peer and empty buffer. // 0 bytes successfully read indicates end-of-file. - return ecx.return_read_success(ptr, &[], 0, &dest); + return ecx.return_read_success(ptr, &[], 0, dest); + } else if self_anonsocket.is_nonblock { + // Non-blocking socketpair with writer and empty buffer. + // https://linux.die.net/man/2/read + // EAGAIN or EWOULDBLOCK can be returned for socket, + // POSIX.1-2001 allows either error to be returned for this case. + // Since there is no ErrorKind for EAGAIN, WouldBlock is used. + return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest); } else { // Blocking socketpair with writer and empty buffer. - let weak_self_ref = weak_self_ref.clone(); + // Block the current thread; only keep a weak ref for this. + let weak_self_ref = self_ref.downgrade(); + let dest = dest.clone(); self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread()); ecx.block_thread( BlockReason::UnnamedSocket, @@ -274,12 +257,17 @@ fn anonsocket_read<'tcx>( dest: MPlaceTy<'tcx>, } @unblock = |this| { - anonsocket_read(weak_self_ref, len, ptr, dest, this) + let Some(self_ref) = weak_self_ref.upgrade() else { + // FIXME: We should raise a deadlock error if the self_ref upgrade failed. + throw_unsup_format!("This will be a deadlock error in future") + }; + anonsocket_read(&self_ref, len, ptr, &dest, this) } ), ); } } else { + // There's data to be read! let mut bytes = vec![0; len]; let mut readbuf = readbuf.borrow_mut(); // Synchronize with all previous writes to this buffer. @@ -313,7 +301,7 @@ fn anonsocket_read<'tcx>( } }; - return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest); + return ecx.return_read_success(ptr, &bytes, actual_read_size, dest); } interp_ok(()) } From 0093764d6340731d6ad66651c4421b60a3364534 Mon Sep 17 00:00:00 2001 From: Ralf Jung Date: Fri, 27 Dec 2024 12:18:09 +0100 Subject: [PATCH 2/3] add test for close-while-blocked --- src/shims/unix/unnamed_socket.rs | 14 +++---- .../libc/socketpair-close-while-blocked.rs | 37 +++++++++++++++++++ .../socketpair-close-while-blocked.stderr | 35 ++++++++++++++++++ 3 files changed, 78 insertions(+), 8 deletions(-) create mode 100644 tests/fail-dep/libc/socketpair-close-while-blocked.rs create mode 100644 tests/fail-dep/libc/socketpair-close-while-blocked.stderr diff --git a/src/shims/unix/unnamed_socket.rs b/src/shims/unix/unnamed_socket.rs index 7f35838e9d..4285786f06 100644 --- a/src/shims/unix/unnamed_socket.rs +++ b/src/shims/unix/unnamed_socket.rs @@ -167,10 +167,9 @@ fn anonsocket_write<'tcx>( dest: MPlaceTy<'tcx>, } @unblock = |this| { - let Some(self_ref) = weak_self_ref.upgrade() else { - // FIXME: We should raise a deadlock error if the self_ref upgrade failed. - throw_unsup_format!("This will be a deadlock error in future") - }; + // If we got unblocked, then our peer successfully upgraded its weak + // ref to us. That means we can also upgrade our weak ref. + let self_ref = weak_self_ref.upgrade().unwrap(); anonsocket_write(&self_ref, ptr, len, &dest, this) } ), @@ -257,10 +256,9 @@ fn anonsocket_read<'tcx>( dest: MPlaceTy<'tcx>, } @unblock = |this| { - let Some(self_ref) = weak_self_ref.upgrade() else { - // FIXME: We should raise a deadlock error if the self_ref upgrade failed. - throw_unsup_format!("This will be a deadlock error in future") - }; + // If we got unblocked, then our peer successfully upgraded its weak + // ref to us. That means we can also upgrade our weak ref. + let self_ref = weak_self_ref.upgrade().unwrap(); anonsocket_read(&self_ref, len, ptr, &dest, this) } ), diff --git a/tests/fail-dep/libc/socketpair-close-while-blocked.rs b/tests/fail-dep/libc/socketpair-close-while-blocked.rs new file mode 100644 index 0000000000..8413e11881 --- /dev/null +++ b/tests/fail-dep/libc/socketpair-close-while-blocked.rs @@ -0,0 +1,37 @@ +//! This is a regression test for : we had some +//! faulty logic around `release_clock` that led to this code not reporting a data race. +//~^^ERROR: deadlock +//@ignore-target: windows # no libc socketpair on Windows +//@compile-flags: -Zmiri-preemption-rate=0 -Zmiri-address-reuse-rate=0 +//@error-in-other-file: deadlock +use std::thread; + +fn main() { + let mut fds = [-1, -1]; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + + let thread1 = thread::spawn(move || { + let mut buf: [u8; 1] = [0; 1]; + let _res: i32 = unsafe { + libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) //~ERROR: deadlock + .try_into() + .unwrap() + }; + }); + let thread2 = thread::spawn(move || { + // Close the FD that the other thread is blocked on. + unsafe { libc::close(fds[1]) }; + }); + + // Run the other threads. + thread::yield_now(); + + // When they are both done, continue here. + let data = "a".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 1) }; + assert_eq!(res, -1); + + thread1.join().unwrap(); + thread2.join().unwrap(); +} diff --git a/tests/fail-dep/libc/socketpair-close-while-blocked.stderr b/tests/fail-dep/libc/socketpair-close-while-blocked.stderr new file mode 100644 index 0000000000..fe196f5d7d --- /dev/null +++ b/tests/fail-dep/libc/socketpair-close-while-blocked.stderr @@ -0,0 +1,35 @@ +error: deadlock: the evaluated program deadlocked + --> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + | +LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) }; + | ^ the evaluated program deadlocked + | + = note: BACKTRACE: + = note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC + = note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC + = note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC +note: inside `main` + --> tests/fail-dep/libc/socketpair-close-while-blocked.rs:LL:CC + | +LL | thread1.join().unwrap(); + | ^^^^^^^^^^^^^^ + +error: deadlock: the evaluated program deadlocked + --> tests/fail-dep/libc/socketpair-close-while-blocked.rs:LL:CC + | +LL | libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) + | ^ the evaluated program deadlocked + | + = note: BACKTRACE on thread `unnamed-ID`: + = note: inside closure at tests/fail-dep/libc/socketpair-close-while-blocked.rs:LL:CC + +error: deadlock: the evaluated program deadlocked + | + = note: the evaluated program deadlocked + = note: (no span available) + = note: BACKTRACE on thread `unnamed-ID`: + +note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace + +error: aborting due to 3 previous errors + From 759fb039be462dc55181e4d8e668b81e54f5682a Mon Sep 17 00:00:00 2001 From: Ralf Jung Date: Fri, 27 Dec 2024 12:32:04 +0100 Subject: [PATCH 3/3] also clean up eventfd code in the same vein --- src/shims/unix/linux_like/eventfd.rs | 55 +++++++++++++--------------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/src/shims/unix/linux_like/eventfd.rs b/src/shims/unix/linux_like/eventfd.rs index 4bbe417ea8..ed81207f54 100644 --- a/src/shims/unix/linux_like/eventfd.rs +++ b/src/shims/unix/linux_like/eventfd.rs @@ -62,11 +62,10 @@ impl FileDescription for Event { return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest); } - // eventfd read at the size of u64. + // Turn the pointer into a place at the right type. let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty); - let weak_eventfd = self_ref.downgrade(); - eventfd_read(buf_place, dest, weak_eventfd, ecx) + eventfd_read(buf_place, dest, self_ref, ecx) } /// A write call adds the 8-byte integer value supplied in @@ -97,18 +96,10 @@ impl FileDescription for Event { return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest); } - // Read the user-supplied value from the pointer. + // Turn the pointer into a place at the right type. let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty); - let num = ecx.read_scalar(&buf_place)?.to_u64()?; - // u64::MAX as input is invalid because the maximum value of counter is u64::MAX - 1. - if num == u64::MAX { - return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest); - } - // If the addition does not let the counter to exceed the maximum value, update the counter. - // Else, block. - let weak_eventfd = self_ref.downgrade(); - eventfd_write(num, buf_place, dest, weak_eventfd, ecx) + eventfd_write(buf_place, dest, self_ref, ecx) } fn as_unix(&self) -> &dyn UnixFileDescription { @@ -193,20 +184,22 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { /// Block thread if the value addition will exceed u64::MAX -1, /// else just add the user-supplied value to current counter. fn eventfd_write<'tcx>( - num: u64, buf_place: MPlaceTy<'tcx>, dest: &MPlaceTy<'tcx>, - weak_eventfd: WeakFileDescriptionRef, + eventfd_ref: &FileDescriptionRef, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - let Some(eventfd_ref) = weak_eventfd.upgrade() else { - throw_unsup_format!("eventfd FD got closed while blocking.") - }; - // Since we pass the weak file description ref, it is guaranteed to be // an eventfd file description. let eventfd = eventfd_ref.downcast::().unwrap(); + // Figure out which value we should add. + let num = ecx.read_scalar(&buf_place)?.to_u64()?; + // u64::MAX as input is invalid because the maximum value of counter is u64::MAX - 1. + if num == u64::MAX { + return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest); + } + match eventfd.counter.get().checked_add(num) { Some(new_count @ 0..=MAX_COUNTER) => { // Future `read` calls will synchronize with this write, so update the FD clock. @@ -219,7 +212,7 @@ fn eventfd_write<'tcx>( // The state changed; we check and update the status of all supported event // types for current file description. - ecx.check_and_update_readiness(&eventfd_ref)?; + ecx.check_and_update_readiness(eventfd_ref)?; // Unblock *all* threads previously blocked on `read`. // We need to take out the blocked thread ids and unblock them together, @@ -244,6 +237,7 @@ fn eventfd_write<'tcx>( eventfd.blocked_write_tid.borrow_mut().push(ecx.active_thread()); + let weak_eventfd = eventfd_ref.downgrade(); ecx.block_thread( BlockReason::Eventfd, None, @@ -255,8 +249,10 @@ fn eventfd_write<'tcx>( weak_eventfd: WeakFileDescriptionRef, } @unblock = |this| { - // When we get unblocked, try again. - eventfd_write(num, buf_place, &dest, weak_eventfd, this) + // When we get unblocked, try again. We know the ref is still valid, + // otherwise there couldn't be a `write` that unblocks us. + let eventfd_ref = weak_eventfd.upgrade().unwrap(); + eventfd_write(buf_place, &dest, &eventfd_ref, this) } ), ); @@ -270,13 +266,9 @@ fn eventfd_write<'tcx>( fn eventfd_read<'tcx>( buf_place: MPlaceTy<'tcx>, dest: &MPlaceTy<'tcx>, - weak_eventfd: WeakFileDescriptionRef, + eventfd_ref: &FileDescriptionRef, ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx> { - let Some(eventfd_ref) = weak_eventfd.upgrade() else { - throw_unsup_format!("eventfd FD got closed while blocking.") - }; - // Since we pass the weak file description ref to the callback function, it is guaranteed to be // an eventfd file description. let eventfd = eventfd_ref.downcast::().unwrap(); @@ -293,6 +285,7 @@ fn eventfd_read<'tcx>( eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread()); + let weak_eventfd = eventfd_ref.downgrade(); ecx.block_thread( BlockReason::Eventfd, None, @@ -303,8 +296,10 @@ fn eventfd_read<'tcx>( weak_eventfd: WeakFileDescriptionRef, } @unblock = |this| { - // When we get unblocked, try again. - eventfd_read(buf_place, &dest, weak_eventfd, this) + // When we get unblocked, try again. We know the ref is still valid, + // otherwise there couldn't be a `write` that unblocks us. + let eventfd_ref = weak_eventfd.upgrade().unwrap(); + eventfd_read(buf_place, &dest, &eventfd_ref, this) } ), ); @@ -317,7 +312,7 @@ fn eventfd_read<'tcx>( // The state changed; we check and update the status of all supported event // types for current file description. - ecx.check_and_update_readiness(&eventfd_ref)?; + ecx.check_and_update_readiness(eventfd_ref)?; // Unblock *all* threads previously blocked on `write`. // We need to take out the blocked thread ids and unblock them together,