diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..bb3cbbdba --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "glommio/liburing"] + path = glommio/liburing + url = git://git.kernel.dk/liburing diff --git a/glommio/Cargo.toml b/glommio/Cargo.toml index e5c9f98e4..b3f52ecd9 100644 --- a/glommio/Cargo.toml +++ b/glommio/Cargo.toml @@ -17,11 +17,10 @@ readme = "../README.md" log = "0.4" concurrent-queue = "1.1.2" futures-lite = "1.11.1" -libc = "0.2.73" +libc = "0.2.77" socket2 = { version = "0.3.18", features = ["unix", "reuseport"] } -iou = { path = "../../iou" } -uring-sys = { path = "../../uring-sys" } nix = "0.19.0" +bitflags = "1.2.0" bitmaps = "2.1.0" typenum = "1.12" scoped-tls = "1.0.0" @@ -42,6 +41,9 @@ futures = "0.3.5" fastrand = "1.4.0" tokio = { version = "0.3.5", default-features = false, features = ["rt", "macros", "rt-multi-thread", "net", "io-util", "time"] } +[build-dependencies] +cc = "1.0.47" + [features] bench = [] diff --git a/glommio/README.uring_sys_iou.md b/glommio/README.uring_sys_iou.md new file mode 100644 index 000000000..7d6bda974 --- /dev/null +++ b/glommio/README.uring_sys_iou.md @@ -0,0 +1,15 @@ +This crate includes imported code for +[uring-sys](https://github.com/ringbahn/uring-sys) and +[iou](https://github.com/ringbahn/iou) +from the ringbahn project. + +It is my intention that those imports are temporary and I don't +mean this as a hard fork. On the other hand those crates haven't been +as actively maintained as we would like. They are very central for +what we do, as uring evolves very rapidly, so I have decided to +soft fork them. + +Every patch that touches code in those directories should make at +least a good faith effort to merge the code back into iou and uring-sys. +Hopefully with time they will, at their own pace, have all the code +that we need in which case we can revert back to using them externally. diff --git a/glommio/build.rs b/glommio/build.rs new file mode 100644 index 000000000..c3be44900 --- /dev/null +++ b/glommio/build.rs @@ -0,0 +1,49 @@ +use std::env; +use std::fs; +use std::path::*; +use std::process::Command; + +use cc::Build; + +fn main() { + let project = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap()) + .canonicalize() + .unwrap(); + let liburing = project.join("liburing"); + + // Run the configure script in OUT_DIR to get `compat.h` + let configured_include = configure(&liburing); + + let src = liburing.join("src"); + + // liburing + Build::new() + .file(src.join("setup.c")) + .file(src.join("queue.c")) + .file(src.join("syscall.c")) + .file(src.join("register.c")) + .include(src.join("include")) + .include(&configured_include) + .extra_warnings(false) + .compile("uring"); + + // (our additional, linkable C bindings) + Build::new() + .file(project.join("rusturing.c")) + .include(src.join("include")) + .include(&configured_include) + .compile("rusturing"); +} + +fn configure(liburing: &Path) -> PathBuf { + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()) + .canonicalize() + .unwrap(); + fs::copy(liburing.join("configure"), out_dir.join("configure")).unwrap(); + fs::create_dir_all(out_dir.join("src/include/liburing")).unwrap(); + Command::new("./configure") + .current_dir(&out_dir) + .output() + .expect("configure script failed"); + out_dir.join("src/include") +} diff --git a/glommio/liburing b/glommio/liburing new file mode 160000 index 000000000..b013dfd5a --- /dev/null +++ b/glommio/liburing @@ -0,0 +1 @@ +Subproject commit b013dfd5a5f65116373d5e0f0bdfb73db9d8816e diff --git a/glommio/rusturing.c b/glommio/rusturing.c new file mode 100644 index 000000000..541d03161 --- /dev/null +++ b/glommio/rusturing.c @@ -0,0 +1,314 @@ +#include "liburing.h" + +extern inline int rust_io_uring_opcode_supported(struct io_uring_probe *p, int op) +{ + return io_uring_opcode_supported(p, op); +} + +extern inline void rust_io_uring_cq_advance(struct io_uring *ring, unsigned nr) +{ + io_uring_cq_advance(ring, nr); +} + +extern inline void rust_io_uring_cqe_seen(struct io_uring *ring, struct io_uring_cqe *cqe) +{ + io_uring_cqe_seen(ring, cqe); +} + +extern inline void rust_io_uring_sqe_set_data(struct io_uring_sqe *sqe, void *data) +{ + io_uring_sqe_set_data(sqe, data); +} + +extern inline void *rust_io_uring_cqe_get_data(struct io_uring_cqe *cqe) +{ + return io_uring_cqe_get_data(cqe); +} + +extern inline void rust_io_uring_sqe_set_flags(struct io_uring_sqe *sqe, unsigned flags) +{ + io_uring_sqe_set_flags(sqe, flags); +} + +extern inline void rust_io_uring_prep_rw(int op, + struct io_uring_sqe *sqe, + int fd, + const void *addr, + unsigned len, + __u64 offset) +{ + io_uring_prep_rw(op, sqe, fd, addr, len, offset); +} + +extern inline void rust_io_uring_prep_splice(struct io_uring_sqe *sqe, + int fd_in, int64_t off_in, + int fd_out, int64_t off_out, + unsigned int nbytes, + unsigned int splice_flags) +{ + io_uring_prep_splice(sqe, fd_in, fd_out, off_in, off_out, nbytes, splice_flags); +} + +extern inline void rust_io_uring_prep_readv(struct io_uring_sqe *sqe, + int fd, + const struct iovec *iovecs, + unsigned nr_vecs, + off_t offset) +{ + io_uring_prep_readv(sqe, fd, iovecs, nr_vecs, offset); +} + +extern inline void rust_io_uring_prep_read_fixed(struct io_uring_sqe *sqe, + int fd, + void *buf, + unsigned nbytes, + off_t offset, + int buf_index) +{ + io_uring_prep_read_fixed(sqe, fd, buf, nbytes, offset, buf_index); +} + +extern inline void rust_io_uring_prep_writev(struct io_uring_sqe *sqe, + int fd, + const struct iovec *iovecs, + unsigned nr_vecs, + off_t offset) +{ + io_uring_prep_writev(sqe, fd, iovecs, nr_vecs, offset); +} + +extern inline void rust_io_uring_prep_write_fixed(struct io_uring_sqe *sqe, + int fd, + const void *buf, + unsigned nbytes, + off_t offset, + int buf_index) +{ + io_uring_prep_write_fixed(sqe, fd, buf, nbytes, offset, buf_index); +} + +extern inline void rust_io_uring_prep_recvmsg(struct io_uring_sqe *sqe, + int fd, + struct msghdr *msg, + unsigned flags) +{ + io_uring_prep_recvmsg(sqe, fd, msg, flags); +} + +extern inline void rust_io_uring_prep_sendmsg(struct io_uring_sqe *sqe, + int fd, + const struct msghdr *msg, + unsigned flags) +{ + io_uring_prep_sendmsg(sqe, fd, msg, flags); +} + +extern inline void rust_io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, short poll_mask) +{ + io_uring_prep_poll_add(sqe, fd, poll_mask); +} + +extern inline void rust_io_uring_prep_poll_remove(struct io_uring_sqe *sqe, void *user_data) +{ + io_uring_prep_poll_remove(sqe, user_data); +} + +extern inline void rust_io_uring_prep_fsync(struct io_uring_sqe *sqe, int fd, unsigned fsync_flags) +{ + io_uring_prep_fsync(sqe, fd, fsync_flags); +} + +extern inline void rust_io_uring_prep_nop(struct io_uring_sqe *sqe) +{ + io_uring_prep_nop(sqe); +} + +extern inline void rust_io_uring_prep_timeout(struct io_uring_sqe *sqe, + struct __kernel_timespec *ts, + unsigned count, + unsigned flags) +{ + io_uring_prep_timeout(sqe, ts, count, flags); +} + +extern inline void rust_io_uring_prep_timeout_remove(struct io_uring_sqe *sqe, + __u64 user_data, + unsigned flags) +{ + io_uring_prep_timeout_remove(sqe, user_data, flags); +} + +extern inline void rust_io_uring_prep_accept(struct io_uring_sqe *sqe, + int fd, + struct sockaddr *addr, + socklen_t *addrlen, + int flags) +{ + io_uring_prep_accept(sqe, fd, addr, addrlen, flags); +} + +extern inline void rust_io_uring_prep_cancel(struct io_uring_sqe *sqe, + void *user_data, + int flags) +{ + io_uring_prep_cancel(sqe, user_data, flags); +} + +extern inline void rust_io_uring_prep_link_timeout(struct io_uring_sqe *sqe, + struct __kernel_timespec *ts, + unsigned flags) +{ + io_uring_prep_link_timeout(sqe, ts, flags); +} + +extern inline void rust_io_uring_prep_connect(struct io_uring_sqe *sqe, + int fd, + struct sockaddr *addr, + socklen_t addrlen) +{ + io_uring_prep_connect(sqe, fd, addr, addrlen); +} + +extern inline void rust_io_uring_prep_files_update(struct io_uring_sqe *sqe, + int *fds, + unsigned nr_fds, + int offset) +{ + io_uring_prep_files_update(sqe, fds, nr_fds, offset); +} + +extern inline void rust_io_uring_prep_fallocate(struct io_uring_sqe *sqe, + int fd, + int mode, + off_t offset, + off_t len) +{ + io_uring_prep_fallocate(sqe, fd, mode, offset, len); +} + +extern inline void rust_io_uring_prep_openat(struct io_uring_sqe *sqe, + int dfd, + const char *path, + int flags, + mode_t mode) +{ + io_uring_prep_openat(sqe, dfd, path, flags, mode); +} + +extern inline void rust_io_uring_prep_close(struct io_uring_sqe *sqe, int fd) { + io_uring_prep_close(sqe, fd); +} + +extern inline void rust_io_uring_prep_read(struct io_uring_sqe *sqe, int fd, + void *buf, unsigned nbytes, off_t offset) +{ + io_uring_prep_read(sqe, fd, buf, nbytes, offset); +} + +extern inline void rust_io_uring_prep_write(struct io_uring_sqe *sqe, int fd, + const void *buf, unsigned nbytes, off_t offset) +{ + io_uring_prep_write(sqe, fd, buf, nbytes, offset); +} + +extern inline void rust_io_uring_prep_statx(struct io_uring_sqe *sqe, + int dfd, + const char *path, + int flags, + unsigned mask, + struct statx *statxbuf) +{ + io_uring_prep_statx(sqe, dfd, path, flags, mask, statxbuf); +} + +extern inline void rust_io_uring_prep_fadvise(struct io_uring_sqe *sqe, int fd, + off_t offset, off_t len, int advice) +{ + io_uring_prep_fadvise(sqe, fd, offset, len, advice); +} + +extern inline void rust_io_uring_prep_madvise(struct io_uring_sqe *sqe, void *addr, + off_t length, int advice) +{ + io_uring_prep_madvise(sqe, addr, length, advice); +} + +extern inline void rust_io_uring_prep_send(struct io_uring_sqe *sqe, int sockfd, + const void *buf, size_t len, int flags) +{ + io_uring_prep_send(sqe, sockfd, buf, len, flags); +} + +extern inline void rust_io_uring_prep_recv(struct io_uring_sqe *sqe, int sockfd, + void *buf, size_t len, int flags) +{ + io_uring_prep_recv(sqe, sockfd, buf, len, flags); +} + +extern inline void rust_io_uring_prep_openat2(struct io_uring_sqe *sqe, int dfd, + const char *path, struct open_how *how) +{ + io_uring_prep_openat2(sqe, dfd, path, how); +} + +extern inline void rust_io_uring_prep_epoll_ctl(struct io_uring_sqe *sqe, int epfd, + int fd, int op, + struct epoll_event *ev) +{ + io_uring_prep_epoll_ctl(sqe, epfd, fd, op, ev); +} + +extern inline void rust_io_uring_prep_provide_buffers(struct io_uring_sqe *sqe, + void *addr, int len, int nr, + int bgid, int bid) +{ + io_uring_prep_provide_buffers(sqe, addr, len, nr, bgid, bid); +} + +extern inline void rust_io_uring_prep_remove_buffers(struct io_uring_sqe *sqe, + int nr, int bgid) +{ + io_uring_prep_remove_buffers(sqe, nr, bgid); +} + +extern inline unsigned rust_io_uring_sq_ready(struct io_uring *ring) +{ + return io_uring_sq_ready(ring); +} + +extern inline unsigned rust_io_uring_sq_space_left(struct io_uring *ring) +{ + return io_uring_sq_space_left(ring); +} + +extern inline unsigned rust_io_uring_cq_ready(struct io_uring *ring) +{ + return io_uring_cq_ready(ring); +} + +extern inline bool rust_io_uring_cq_eventfd_enabled(struct io_uring *ring) +{ + return io_uring_cq_eventfd_enabled(ring); +} + +extern inline int rust_io_uring_cq_eventfd_toggle(struct io_uring *ring, bool enabled) +{ + return io_uring_cq_eventfd_toggle(ring, enabled); +} + +extern inline int rust_io_uring_wait_cqe_nr(struct io_uring *ring, + struct io_uring_cqe **cqe_ptr, + unsigned wait_nr) +{ + return io_uring_wait_cqe_nr(ring, cqe_ptr, wait_nr); +} + +extern inline int rust_io_uring_peek_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr) +{ + return io_uring_peek_cqe(ring, cqe_ptr); +} + +extern inline int rust_io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr) +{ + return io_uring_wait_cqe(ring, cqe_ptr); +} diff --git a/glommio/src/iou/.submission_queue.rs.swp b/glommio/src/iou/.submission_queue.rs.swp new file mode 100644 index 000000000..2a40b7c4b Binary files /dev/null and b/glommio/src/iou/.submission_queue.rs.swp differ diff --git a/glommio/src/iou/completion_queue.rs b/glommio/src/iou/completion_queue.rs new file mode 100644 index 000000000..c1a91e4ee --- /dev/null +++ b/glommio/src/iou/completion_queue.rs @@ -0,0 +1,117 @@ +use std::fmt; +use std::io; +use std::marker::PhantomData; +use std::mem::MaybeUninit; +use std::ptr::{self, NonNull}; + +use super::{resultify, CQEs, CQEsBlocking, IoUring, CQE}; +use crate::uring_sys; + +/// The queue of completed IO events. +/// +/// Each element is a [`CQE`](crate::cqe::CQE). +/// +/// Completion does not imply success. Completed events may be [timeouts](crate::cqe::CQE::is_iou_timeout). +pub struct CompletionQueue<'ring> { + pub(crate) ring: NonNull, + _marker: PhantomData<&'ring mut IoUring>, +} + +impl<'ring> CompletionQueue<'ring> { + pub(crate) fn new(ring: &'ring IoUring) -> CompletionQueue<'ring> { + CompletionQueue { + ring: NonNull::from(&ring.ring), + _marker: PhantomData, + } + } + + /// Returns the next CQE if any are available. + pub fn peek_for_cqe(&mut self) -> Option { + unsafe { + let mut cqe = MaybeUninit::uninit(); + uring_sys::io_uring_peek_cqe(self.ring.as_ptr(), cqe.as_mut_ptr()); + let cqe = cqe.assume_init(); + if !cqe.is_null() { + Some(CQE::new(self.ring, &mut *cqe)) + } else { + None + } + } + } + + /// Returns the next CQE, blocking the thread until one is ready if necessary. + pub fn wait_for_cqe(&mut self) -> io::Result { + self.wait_for_cqes(1) + } + + #[inline(always)] + pub(crate) fn wait_for_cqes(&mut self, count: u32) -> io::Result { + let ring = self.ring; + self.wait_inner(count).map(|cqe| CQE::new(ring, cqe)) + } + + /// Block the thread until at least `count` CQEs are ready. + /// + /// These CQEs can be processed using `peek_for_cqe` or the `cqes` iterator. + pub fn wait(&mut self, count: u32) -> io::Result<()> { + self.wait_inner(count).map(|_| ()) + } + + #[inline(always)] + fn wait_inner(&mut self, count: u32) -> io::Result<&mut uring_sys::io_uring_cqe> { + unsafe { + let mut cqe = MaybeUninit::uninit(); + + resultify(uring_sys::io_uring_wait_cqes( + self.ring.as_ptr(), + cqe.as_mut_ptr(), + count as _, + ptr::null(), + ptr::null(), + ))?; + + Ok(&mut *cqe.assume_init()) + } + } + + /// Returns an iterator of ready CQEs. + /// + /// When there are no CQEs ready to process, the iterator will end. It will never + /// block the thread to wait for CQEs to be completed. + pub fn cqes(&mut self) -> CQEs<'_> { + CQEs::new(self.ring) + } + + /// Returns an iterator of ready CQEs, blocking when there are none ready. + /// + /// This iterator never ends. Whenever there are no CQEs ready, it will block + /// the thread until at least `wait_for` CQEs are ready. + pub fn cqes_blocking(&mut self, wait_for: u32) -> CQEsBlocking<'_> { + CQEsBlocking::new(self.ring, wait_for) + } + + pub fn ready(&self) -> u32 { + unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) } + } + + pub fn eventfd_enabled(&self) -> bool { + unsafe { uring_sys::io_uring_cq_eventfd_enabled(self.ring.as_ptr()) } + } + + pub fn eventfd_toggle(&mut self, enabled: bool) -> io::Result<()> { + resultify(unsafe { uring_sys::io_uring_cq_eventfd_toggle(self.ring.as_ptr(), enabled) })?; + Ok(()) + } +} + +impl fmt::Debug for CompletionQueue<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let fd = unsafe { self.ring.as_ref().ring_fd }; + f.debug_struct(std::any::type_name::()) + .field("fd", &fd) + .finish() + } +} + +unsafe impl<'ring> Send for CompletionQueue<'ring> {} +unsafe impl<'ring> Sync for CompletionQueue<'ring> {} diff --git a/glommio/src/iou/cqe.rs b/glommio/src/iou/cqe.rs new file mode 100644 index 000000000..ae852c7e2 --- /dev/null +++ b/glommio/src/iou/cqe.rs @@ -0,0 +1,207 @@ +use std::io; +use std::marker::PhantomData; +use std::mem::MaybeUninit; +use std::ptr::{self, NonNull}; + +use super::{resultify, IoUring}; +use crate::uring_sys; + +/// A completed IO event. +#[derive(Debug)] +pub struct CQE { + user_data: u64, + res: i32, + flags: CompletionFlags, +} + +impl CQE { + pub fn from_raw(cqe: uring_sys::io_uring_cqe) -> CQE { + CQE { + user_data: cqe.user_data, + res: cqe.res, + flags: CompletionFlags::from_bits_truncate(cqe.flags), + } + } + + pub fn from_raw_parts(user_data: u64, res: i32, flags: CompletionFlags) -> CQE { + CQE { + user_data, + res, + flags, + } + } + + pub(crate) fn new( + ring: NonNull, + cqe: &mut uring_sys::io_uring_cqe, + ) -> CQE { + let user_data = cqe.user_data; + let res = cqe.res; + let flags = CompletionFlags::from_bits_truncate(cqe.flags); + + unsafe { + uring_sys::io_uring_cqe_seen(ring.as_ptr(), cqe); + } + + CQE::from_raw_parts(user_data, res, flags) + } + + pub fn user_data(&self) -> u64 { + self.user_data as u64 + } + + pub fn result(&self) -> io::Result { + resultify(self.res) + } + + pub fn flags(&self) -> CompletionFlags { + self.flags + } + + pub fn raw_result(&self) -> i32 { + self.res + } + + pub fn raw_flags(&self) -> u32 { + self.flags.bits() + } +} + +unsafe impl Send for CQE {} +unsafe impl Sync for CQE {} + +/// An iterator of [`CQE`]s from the [`CompletionQueue`](crate::CompletionQueue). +/// +/// This iterator will be exhausted when there are no `CQE`s ready, and return `None`. +pub struct CQEs<'a> { + ring: NonNull, + ready: u32, + marker: PhantomData<&'a mut IoUring>, +} + +impl<'a> CQEs<'a> { + pub(crate) fn new(ring: NonNull) -> CQEs<'a> { + CQEs { + ring, + ready: 0, + marker: PhantomData, + } + } + + #[inline(always)] + fn ready(&self) -> u32 { + unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) } + } + + #[inline(always)] + fn peek_for_cqe(&mut self) -> Option { + unsafe { + let mut cqe = MaybeUninit::uninit(); + uring_sys::io_uring_peek_cqe(self.ring.as_ptr(), cqe.as_mut_ptr()); + let cqe = cqe.assume_init(); + if !cqe.is_null() { + Some(CQE::new(self.ring, &mut *cqe)) + } else { + None + } + } + } +} + +impl Iterator for CQEs<'_> { + type Item = CQE; + + fn next(&mut self) -> Option { + if self.ready == 0 { + self.ready = self.ready(); + if self.ready == 0 { + return None; + } + } + + self.ready -= 1; + self.peek_for_cqe() + } +} + +/// An iterator of [`CQE`]s from the [`CompletionQueue`](crate::CompletionQueue). +/// +/// This iterator will never be exhausted; if there are no `CQE`s ready, it will block until there +/// are. +pub struct CQEsBlocking<'a> { + ring: NonNull, + ready: u32, + wait_for: u32, + marker: PhantomData<&'a mut IoUring>, +} + +impl<'a> CQEsBlocking<'a> { + pub(crate) fn new(ring: NonNull, wait_for: u32) -> CQEsBlocking<'a> { + CQEsBlocking { + ring, + ready: 0, + wait_for, + marker: PhantomData, + } + } + + #[inline(always)] + fn ready(&self) -> u32 { + unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) } + } + + #[inline(always)] + fn peek_for_cqe(&mut self) -> Option { + unsafe { + let mut cqe = MaybeUninit::uninit(); + uring_sys::io_uring_peek_cqe(self.ring.as_ptr(), cqe.as_mut_ptr()); + let cqe = cqe.assume_init(); + if !cqe.is_null() { + Some(CQE::new(self.ring, &mut *cqe)) + } else { + None + } + } + } + + #[inline(always)] + fn wait(&mut self) -> io::Result<&mut uring_sys::io_uring_cqe> { + unsafe { + let mut cqe = MaybeUninit::uninit(); + + resultify(uring_sys::io_uring_wait_cqes( + self.ring.as_ptr(), + cqe.as_mut_ptr(), + self.wait_for as _, + ptr::null(), + ptr::null(), + ))?; + + Ok(&mut *cqe.assume_init()) + } + } +} + +impl Iterator for CQEsBlocking<'_> { + type Item = io::Result; + + fn next(&mut self) -> Option { + if self.ready == 0 { + self.ready = self.ready(); + if self.ready == 0 { + let ring = self.ring; + return Some(self.wait().map(|cqe| CQE::new(ring, cqe))); + } + } + + self.ready -= 1; + self.peek_for_cqe().map(Ok) + } +} + +bitflags::bitflags! { + /// Flags that can be returned from the kernel on [`CQE`]s. + pub struct CompletionFlags: u32 { + const BUFFER_SHIFT = 1 << 0; + } +} diff --git a/glommio/src/iou/mod.rs b/glommio/src/iou/mod.rs new file mode 100644 index 000000000..c45d7923a --- /dev/null +++ b/glommio/src/iou/mod.rs @@ -0,0 +1,383 @@ +//! Idiomatic Rust bindings to liburing. +//! +//! This gives users an idiomatic Rust interface for interacting with the Linux kernel's `io_uring` +//! interface for async IO. Despite being idiomatic Rust, this interface is still very low level +//! and some fundamental operations remain unsafe. +//! +//! The core entry point to the API is the `IoUring` type, which manages an `io_uring` object for +//! interfacing with the kernel. Using this, users can submit IO events and wait for their +//! completion. +//! +//! It is also possible to "split" an `IoUring` instance into its constituent components - a +//! `SubmissionQueue`, a `CompletionQueue`, and a `Registrar` - in order to operate on them +//! separately without synchronization. +//! +//! # Submitting events +//! +//! You can prepare new IO events using the `SQE` type. Once an event has been +//! prepared, the next call to submit will submit that event. Eventually, those events will +//! complete, and that a `CQE` will appear on the completion queue indicating that +//! the event is complete. +//! +//! Preparing IO events is inherently unsafe, as you must guarantee that the buffers and file +//! descriptors used for that IO are alive long enough for the kernel to perform the IO operation +//! with them. +//! +//! # Timeouts +//! +//! Some APIs allow you to time out a call into the kernel. It's important to note how this works +//! with io_uring. +//! +//! A timeout is submitted as an additional IO event which completes after the specified time. +//! Therefore when you create a timeout, all that happens is that a completion event will appear +//! after that specified time. This also means that when processing completion events, you need to +//! be prepared for the possibility that the completion represents a timeout and not a normal IO +//! event (`CQE` has a method to check for this). +use crate::uring_sys; + +/// Types related to completion queue events. +pub mod cqe; +/// Types related to submission queue events. +/// +/// The most important types here are [`SQE`], which represents a single submission queue event, +/// and [`SQEs`], which represents a sequence of events that can be prepared at once. +/// +/// Many of the types in this module are re-exported from the `nix` crate, and are used when +/// preparing [`SQE`]s associated with specific Linux system operations. +pub mod sqe; + +mod completion_queue; +mod submission_queue; + +mod probe; + +pub mod registrar; + +use std::fmt; +use std::io; +use std::mem::{self, MaybeUninit}; +use std::os::unix::io::RawFd; +use std::ptr::{self, NonNull}; +use std::time::Duration; + +#[doc(inline)] +pub use cqe::{CQEs, CQEsBlocking, CQE}; +#[doc(inline)] +pub use sqe::{SQEs, SQE}; + +pub use completion_queue::CompletionQueue; +pub use submission_queue::SubmissionQueue; + +pub use probe::Probe; +#[doc(inline)] +pub use registrar::{Personality, Registrar}; + +bitflags::bitflags! { + /// [`IoUring`] initialization flags for advanced use cases. + /// + pub struct SetupFlags: u32 { + /// Poll the IO context instead of defaulting to interrupts. + const IOPOLL = 1 << 0; /* io_context is polled */ + /// Assign a kernel thread to poll the submission queue. Requires elevated privileges to set. + const SQPOLL = 1 << 1; /* SQ poll thread */ + /// Force the kernel thread created with `SQPOLL` to be bound to the CPU used by the + /// `SubmissionQueue`. Requires `SQPOLL` set. + const SQ_AFF = 1 << 2; /* sq_thread_cpu is valid */ + + const CQSIZE = 1 << 3; + const CLAMP = 1 << 4; + const ATTACH_WQ = 1 << 5; + } +} + +bitflags::bitflags! { + /// Advanced features that can be enabled when setting up an [`IoUring`] instance. + pub struct SetupFeatures: u32 { + const SINGLE_MMAP = 1 << 0; + const NODROP = 1 << 1; + const SUBMIT_STABLE = 1 << 2; + const RW_CUR_POS = 1 << 3; + const CUR_PERSONALITY = 1 << 4; + const FAST_POLL = 1 << 5; + const POLL_32BITS = 1 << 6; + } +} + +/// The main interface to kernel IO using `io_uring`. +/// +/// `IoUring` is a high-level wrapper around an [`io_uring`](uring_sys::io_uring) object. +/// +/// `IoUring`s are constructed with a requested number of ring buffer entries and possibly a set of +/// [`SetupFlags`](SetupFlags). Allocations for `IoUring` are `memlocked` and will not be paged +/// out. +/// +/// `IoUring`s can either be used directly, or split into separate parts and +/// operated on without synchronization. +pub struct IoUring { + ring: uring_sys::io_uring, +} + +impl IoUring { + /// Creates a new `IoUring` without any setup flags. `IoUring`'s created using this method will + /// use interrupt-driven IO. + /// + /// The number of entries must be in the range of 1..4096 (inclusive) and + /// it's recommended to be a power of two. + /// + /// The underlying `SubmissionQueue` and `CompletionQueue` will each have this number of + /// entries. + pub fn new(entries: u32) -> io::Result { + IoUring::new_with_flags(entries, SetupFlags::empty(), SetupFeatures::empty()) + } + + /// Creates a new `IoUring` using a set of `SetupFlags` and `SetupFeatures` for advanced + /// use cases. + pub fn new_with_flags( + entries: u32, + flags: SetupFlags, + features: SetupFeatures, + ) -> io::Result { + unsafe { + let mut params: uring_sys::io_uring_params = mem::zeroed(); + params.flags = flags.bits(); + params.features = features.bits(); + let mut ring = MaybeUninit::uninit(); + resultify(uring_sys::io_uring_queue_init_params( + entries as _, + ring.as_mut_ptr(), + &mut params, + ))?; + Ok(IoUring { + ring: ring.assume_init(), + }) + } + } + + /// Returns the `SubmissionQueue` part of the `IoUring`. + pub fn sq(&mut self) -> SubmissionQueue<'_> { + SubmissionQueue::new(&*self) + } + + /// Returns the `CompletionQueue` part of the `IoUring`. + pub fn cq(&mut self) -> CompletionQueue<'_> { + CompletionQueue::new(&*self) + } + + /// Returns the `Registrar` part of the `IoUring`. + pub fn registrar(&self) -> Registrar<'_> { + Registrar::new(self) + } + + /// Returns the three constituent parts of the `IoUring`. + pub fn queues(&mut self) -> (SubmissionQueue<'_>, CompletionQueue<'_>, Registrar<'_>) { + ( + SubmissionQueue::new(&*self), + CompletionQueue::new(&*self), + Registrar::new(&*self), + ) + } + + pub fn probe(&mut self) -> io::Result { + Probe::for_ring(&mut self.ring) + } + + /// Returns the next [`SQE`] which can be prepared to submit. + pub fn prepare_sqe(&mut self) -> Option> { + unsafe { submission_queue::prepare_sqe(&mut self.ring) } + } + + /// Returns the next `count` [`SQE`]s which can be prepared to submit as an iterator. + /// + /// See the [`SQEs`] type for more information about how these multiple SQEs can be used. + pub fn prepare_sqes(&mut self, count: u32) -> Option> { + unsafe { submission_queue::prepare_sqes(&mut self.ring.sq, count) } + } + + /// Submit all prepared [`SQE`]s to the kernel. + pub fn submit_sqes(&mut self) -> io::Result { + self.sq().submit() + } + + /// Submit all prepared [`SQE`]s to the kernel and wait until at least `wait_for` events have + /// completed. + pub fn submit_sqes_and_wait(&mut self, wait_for: u32) -> io::Result { + self.sq().submit_and_wait(wait_for) + } + + /// Submit all prepared [`SQE`]s to the kernel and wait until at least `wait_for` events have + /// completed or `duration` has passed. + pub fn submit_sqes_and_wait_with_timeout( + &mut self, + wait_for: u32, + duration: Duration, + ) -> io::Result { + self.sq().submit_and_wait_with_timeout(wait_for, duration) + } + + /// Peek for any [`CQE`] that is already completed, without blocking. This will consume that + /// CQE. + pub fn peek_for_cqe(&mut self) -> Option { + unsafe { + let mut cqe = MaybeUninit::uninit(); + let count = uring_sys::io_uring_peek_batch_cqe(&mut self.ring, cqe.as_mut_ptr(), 1); + + if count > 0 { + Some(CQE::new(NonNull::from(&self.ring), &mut *cqe.assume_init())) + } else { + None + } + } + } + + /// Block until at least one [`CQE`] is completed. This will consume that CQE. + pub fn wait_for_cqe(&mut self) -> io::Result { + let ring = NonNull::from(&self.ring); + self.inner_wait_for_cqes(1, ptr::null()) + .map(|cqe| CQE::new(ring, cqe)) + } + + /// Block until a [`CQE`] is ready or timeout. + pub fn wait_for_cqe_with_timeout(&mut self, duration: Duration) -> io::Result { + let ts = uring_sys::__kernel_timespec { + tv_sec: duration.as_secs() as _, + tv_nsec: duration.subsec_nanos() as _, + }; + + let ring = NonNull::from(&self.ring); + self.inner_wait_for_cqes(1, &ts) + .map(|cqe| CQE::new(ring, cqe)) + } + + /// Returns an iterator of [`CQE`]s which are ready from the kernel. + pub fn cqes(&mut self) -> CQEs<'_> { + CQEs::new(NonNull::from(&mut self.ring)) + } + + /// Returns an iterator of [`CQE`]s which will block when there are no CQEs ready. It will + /// block until at least `count` are ready, and then continue iterating. + /// + /// This iterator will never be exhausted; every time it runs out of CQEs it will block the + /// thread and wait for more to be ready. + pub fn cqes_blocking(&mut self, count: u32) -> CQEsBlocking<'_> { + CQEsBlocking::new(NonNull::from(&mut self.ring), count) + } + + /// Wait until `count` [`CQE`]s are ready, without submitting any events. + pub fn wait_for_cqes(&mut self, count: u32) -> io::Result<()> { + self.inner_wait_for_cqes(count as _, ptr::null()) + .map(|_| ()) + } + + fn inner_wait_for_cqes( + &mut self, + count: u32, + ts: *const uring_sys::__kernel_timespec, + ) -> io::Result<&mut uring_sys::io_uring_cqe> { + unsafe { + let mut cqe = MaybeUninit::uninit(); + + resultify(uring_sys::io_uring_wait_cqes( + &mut self.ring, + cqe.as_mut_ptr(), + count, + ts, + ptr::null(), + ))?; + + Ok(&mut *cqe.assume_init()) + } + } + + pub fn raw(&self) -> &uring_sys::io_uring { + &self.ring + } + + pub unsafe fn raw_mut(&mut self) -> &mut uring_sys::io_uring { + &mut self.ring + } + + pub fn cq_ready(&mut self) -> u32 { + self.cq().ready() + } + + pub fn sq_ready(&mut self) -> u32 { + self.sq().ready() + } + + pub fn sq_space_left(&mut self) -> u32 { + self.sq().space_left() + } + + pub fn cq_eventfd_enabled(&mut self) -> bool { + self.cq().eventfd_enabled() + } + + pub fn cq_eventfd_toggle(&mut self, enabled: bool) -> io::Result<()> { + self.cq().eventfd_toggle(enabled) + } + + pub fn raw_fd(&self) -> RawFd { + self.ring.ring_fd + } +} + +impl fmt::Debug for IoUring { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(std::any::type_name::()) + .field("fd", &self.ring.ring_fd) + .finish() + } +} + +impl Drop for IoUring { + fn drop(&mut self) { + unsafe { uring_sys::io_uring_queue_exit(&mut self.ring) }; + } +} + +unsafe impl Send for IoUring {} +unsafe impl Sync for IoUring {} + +fn resultify(x: i32) -> io::Result { + match x >= 0 { + true => Ok(x as u32), + false => Err(io::Error::from_raw_os_error(-x)), + } +} + +#[cfg(test)] +mod tests { + use super::resultify; + + #[test] + fn test_resultify() { + let side_effect = |i, effect: &mut _| -> i32 { + *effect += 1; + return i; + }; + + let mut calls = 0; + let ret = resultify(side_effect(0, &mut calls)); + assert!(match ret { + Ok(0) => true, + _ => false, + }); + assert_eq!(calls, 1); + + calls = 0; + let ret = resultify(side_effect(1, &mut calls)); + assert!(match ret { + Ok(1) => true, + _ => false, + }); + assert_eq!(calls, 1); + + calls = 0; + let ret = resultify(side_effect(-1, &mut calls)); + assert!(match ret { + Err(e) if e.raw_os_error() == Some(1) => true, + _ => false, + }); + assert_eq!(calls, 1); + } +} diff --git a/glommio/src/iou/probe.rs b/glommio/src/iou/probe.rs new file mode 100644 index 000000000..7ea2774e3 --- /dev/null +++ b/glommio/src/iou/probe.rs @@ -0,0 +1,39 @@ +use crate::uring_sys; +use std::io; +use std::ptr::NonNull; + +/// A probe of the operations supported by this kernel version's io-uring interface. +#[derive(Debug)] +pub struct Probe { + probe: NonNull, +} + +impl Probe { + pub fn new() -> io::Result { + unsafe { + let probe = uring_sys::io_uring_get_probe(); + NonNull::new(probe) + .ok_or_else(io::Error::last_os_error) + .map(|probe| Probe { probe }) + } + } + + pub(crate) fn for_ring(ring: *mut uring_sys::io_uring) -> io::Result { + unsafe { + let probe = uring_sys::io_uring_get_probe_ring(ring); + NonNull::new(probe) + .ok_or_else(io::Error::last_os_error) + .map(|probe| Probe { probe }) + } + } + + pub fn supports(&self, op: uring_sys::IoRingOp) -> bool { + unsafe { uring_sys::io_uring_opcode_supported(self.probe.as_ptr(), op as _) != 0 } + } +} + +impl Drop for Probe { + fn drop(&mut self) { + unsafe { libc::free(self.probe.as_ptr() as *mut _) } + } +} diff --git a/glommio/src/iou/registrar/mod.rs b/glommio/src/iou/registrar/mod.rs new file mode 100644 index 000000000..21b504f99 --- /dev/null +++ b/glommio/src/iou/registrar/mod.rs @@ -0,0 +1,322 @@ +//! Types related to registration and registered resources. +//! +//! The [`Registrar`] type can be used to register resources with the kernel that will be used with +//! a particular [`IoUring`] instance. This can improve performance by avoiding the kernel from +//! reallocating resources for each IO events performed against those resources. +//! +//! When file descriptors and buffers are registered with the kernel, an iterator of the type-safe +//! [`Registered`] wrapper is returned. This wrapper makes it easier to correctly use +//! pre-registered resources. By passing a [`RegisteredFd`] or the correct type of registered +//! buffer to an [`SQE`][crate::SQE]'s prep methods, the SQE will be properly prepared to use the +//! pre-registered object. +mod registered; + +use std::fmt; +use std::io; +use std::marker::PhantomData; +use std::os::unix::io::RawFd; +use std::ptr::NonNull; + +use super::{resultify, IoUring, Probe}; +use crate::uring_sys; + +pub use registered::*; + +/// A `Registrar` creates ahead-of-time kernel references to files and user buffers. +/// +/// Preregistration significantly reduces per-IO overhead, so consider registering frequently +/// used files and buffers. For file IO, preregistration lets the kernel skip the atomic acquire and +/// release of a kernel-specific file descriptor. For buffer IO, the kernel can avoid mapping kernel +/// memory for every operation. +/// +/// Beware that registration is relatively expensive and should be done before any performance +/// sensitive code. +/// +/// If you want to register a file but don't have an open file descriptor yet, you can register +/// a [placeholder](PLACEHOLDER_FD) descriptor and +/// [update](crate::registrar::Registrar::update_registered_files) it later. +pub struct Registrar<'ring> { + ring: NonNull, + _marker: PhantomData<&'ring mut IoUring>, +} + +impl<'ring> Registrar<'ring> { + pub(crate) fn new(ring: &'ring IoUring) -> Registrar<'ring> { + Registrar { + ring: NonNull::from(&ring.ring), + _marker: PhantomData, + } + } + + pub fn register_buffers( + &self, + buffers: Vec>, + ) -> io::Result> { + let len = buffers.len(); + let addr = buffers.as_ptr() as *const _; + resultify(unsafe { + uring_sys::io_uring_register_buffers(self.ring.as_ptr(), addr, len as _) + })?; + Ok(buffers + .into_iter() + .enumerate() + .map(|(i, buf)| RegisteredBuf::new(i as u32, buf))) + } + + pub fn register_buffers_by_ref<'a>( + &self, + buffers: &'a [&'a [u8]], + ) -> io::Result> + 'a> { + let len = buffers.len(); + let addr = buffers.as_ptr() as *const _; + resultify(unsafe { + uring_sys::io_uring_register_buffers(self.ring.as_ptr(), addr, len as _) + })?; + Ok(buffers + .iter() + .enumerate() + .map(|(i, buf)| Registered::new(i as u32, &**buf))) + } + + pub fn register_buffers_by_mut<'a>( + &self, + buffers: &'a mut [&'a mut [u8]], + ) -> io::Result> + 'a> { + let len = buffers.len(); + let addr = buffers.as_ptr() as *const _; + resultify(unsafe { + uring_sys::io_uring_register_buffers(self.ring.as_ptr(), addr, len as _) + })?; + Ok(buffers + .iter_mut() + .enumerate() + .map(|(i, buf)| Registered::new(i as u32, &mut **buf))) + } + + /// Unregister all currently registered buffers. An explicit call to this method is often unecessary, + /// because all buffers will be unregistered automatically when the ring is dropped. + pub fn unregister_buffers(&self) -> io::Result<()> { + resultify(unsafe { uring_sys::io_uring_unregister_buffers(self.ring.as_ptr()) })?; + Ok(()) + } + + /// Register a set of files with the kernel. Registered files handle kernel fileset indexing + /// behind the scenes and can often be used in place of raw file descriptors. + /// + /// # Errors + /// Returns an error if + /// * there is a preexisting set of registered files, + /// * the `files` slice was empty, + /// * the inner [`io_uring_register_files`](uring_sys::io_uring_register_files) call failed for + /// another reason + pub fn register_files<'a>( + &self, + files: &'a [RawFd], + ) -> io::Result + 'a> { + assert!(files.len() <= u32::MAX as usize); + resultify(unsafe { + uring_sys::io_uring_register_files( + self.ring.as_ptr(), + files.as_ptr() as *const _, + files.len() as _, + ) + })?; + Ok(files + .iter() + .enumerate() + .map(|(i, &fd)| RegisteredFd::new(i as u32, fd))) + } + + /// Update the currently registered kernel fileset. It is usually more efficient to reserve space + /// for files before submitting events, because `IoUring` will wait until the submission queue is + /// empty before registering files. + /// # Errors + /// Returns an error if + /// * there isn't a registered fileset, + /// * the `files` slice was empty, + /// * `offset` is out of bounds, + /// * the `files` slice was too large, + /// * the inner [`io_uring_register_files_update`](uring_sys::io_uring_register_files_update) call + /// failed for another reason + pub fn update_registered_files<'a>( + &mut self, + offset: usize, + files: &'a [RawFd], + ) -> io::Result + 'a> { + assert!(files.len() + offset <= u32::MAX as usize); + resultify(unsafe { + uring_sys::io_uring_register_files_update( + self.ring.as_ptr(), + offset as _, + files.as_ptr() as *const _, + files.len() as _, + ) + })?; + Ok(files + .iter() + .enumerate() + .map(move |(i, &fd)| RegisteredFd::new((i + offset) as u32, fd))) + } + + pub fn unregister_files(&self) -> io::Result<()> { + resultify(unsafe { uring_sys::io_uring_unregister_files(self.ring.as_ptr()) })?; + Ok(()) + } + + pub fn register_eventfd(&self, eventfd: RawFd) -> io::Result<()> { + resultify(unsafe { uring_sys::io_uring_register_eventfd(self.ring.as_ptr(), eventfd) })?; + Ok(()) + } + + pub fn register_eventfd_async(&self, eventfd: RawFd) -> io::Result<()> { + resultify(unsafe { + uring_sys::io_uring_register_eventfd_async(self.ring.as_ptr(), eventfd) + })?; + Ok(()) + } + + pub fn unregister_eventfd(&self) -> io::Result<()> { + resultify(unsafe { uring_sys::io_uring_unregister_eventfd(self.ring.as_ptr()) })?; + Ok(()) + } + + pub fn register_personality(&self) -> io::Result { + let id = + resultify(unsafe { uring_sys::io_uring_register_personality(self.ring.as_ptr()) })?; + debug_assert!(id < u16::MAX as u32); + Ok(Personality { id: id as u16 }) + } + + pub fn unregister_personality(&self, personality: Personality) -> io::Result<()> { + resultify(unsafe { + uring_sys::io_uring_unregister_personality(self.ring.as_ptr(), personality.id as _) + })?; + Ok(()) + } + + pub fn probe(&self) -> io::Result { + Probe::for_ring(self.ring.as_ptr()) + } +} + +impl fmt::Debug for Registrar<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let fd = unsafe { self.ring.as_ref().ring_fd }; + f.debug_struct(std::any::type_name::()) + .field("fd", &fd) + .finish() + } +} + +unsafe impl<'ring> Send for Registrar<'ring> {} +unsafe impl<'ring> Sync for Registrar<'ring> {} + +#[derive(Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Clone, Copy)] +pub struct Personality { + pub(crate) id: u16, +} + +impl From for Personality { + fn from(id: u16) -> Personality { + Personality { id } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use std::os::unix::io::AsRawFd; + + #[test] + #[should_panic(expected = "Invalid argument")] + fn register_empty_slice() { + let ring = IoUring::new(1).unwrap(); + let _ = ring.registrar().register_files(&[]).unwrap(); + } + + #[test] + #[should_panic(expected = "Bad file descriptor")] + fn register_bad_fd() { + let ring = IoUring::new(1).unwrap(); + let _ = ring.registrar().register_files(&[-100]).unwrap(); + } + + #[test] + #[should_panic(expected = "Device or resource busy")] + fn double_register() { + let ring = IoUring::new(1).unwrap(); + let _ = ring.registrar().register_files(&[1]).unwrap(); + let _ = ring.registrar().register_files(&[1]).unwrap(); + } + + #[test] + #[should_panic(expected = "No such device or address")] + fn empty_unregister_err() { + let ring = IoUring::new(1).unwrap(); + let _ = ring.registrar().unregister_files().unwrap(); + } + + #[test] + #[should_panic(expected = "No such device or address")] + fn empty_update_err() { + let ring = IoUring::new(1).unwrap(); + let _ = ring.registrar().update_registered_files(0, &[1]).unwrap(); + } + + #[test] + #[should_panic(expected = "Invalid argument")] + fn offset_out_of_bounds_update() { + let raw_fds = [1, 2]; + let ring = IoUring::new(1).unwrap(); + let _ = ring.registrar().register_files(&raw_fds).unwrap(); + let _ = ring + .registrar() + .update_registered_files(2, &raw_fds) + .unwrap(); + } + + #[test] + #[should_panic(expected = "Invalid argument")] + fn slice_len_out_of_bounds_update() { + let ring = IoUring::new(1).unwrap(); + let _ = ring.registrar().register_files(&[1, 1]).unwrap(); + let _ = ring + .registrar() + .update_registered_files(0, &[1, 1, 1]) + .unwrap(); + } + + #[test] + fn valid_fd_update() { + let ring = IoUring::new(1).unwrap(); + + let file = std::fs::File::create("tmp.txt").unwrap(); + let _ = ring + .registrar() + .register_files(&[file.as_raw_fd()]) + .unwrap(); + + let new_file = std::fs::File::create("new_tmp.txt").unwrap(); + let _ = ring + .registrar() + .update_registered_files(0, &[new_file.as_raw_fd()]) + .unwrap(); + + let _ = std::fs::remove_file("tmp.txt"); + let _ = std::fs::remove_file("new_tmp.txt"); + } + + #[test] + fn placeholder_update() { + let ring = IoUring::new(1).unwrap(); + let _ = ring.registrar().register_files(&[-1, -1, -1]).unwrap(); + + let file = std::fs::File::create("tmp.txt").unwrap(); + let _ = ring + .registrar() + .update_registered_files(0, &[file.as_raw_fd()]) + .unwrap(); + let _ = std::fs::remove_file("tmp.txt"); + } +} diff --git a/glommio/src/iou/registrar/registered.rs b/glommio/src/iou/registrar/registered.rs new file mode 100644 index 000000000..89f9105e6 --- /dev/null +++ b/glommio/src/iou/registrar/registered.rs @@ -0,0 +1,348 @@ +use std::io; +use std::ops::*; +use std::os::unix::io::{AsRawFd, RawFd}; + +use crate::iou::sqe::SQE; +use crate::uring_sys; + +pub const PLACEHOLDER_FD: RawFd = -1; + +/// A member of the kernel's registered fileset. +/// +/// Valid `RegisteredFd`s can be obtained through a [`Registrar`](crate::registrar::Registrar). +/// +/// Registered files handle kernel fileset indexing behind the scenes and can often be used in place +/// of raw file descriptors. Not all IO operations support registered files. +/// +/// Submission event prep methods on `RegisteredFd` will ensure that the submission event's +/// `SubmissionFlags::FIXED_FILE` flag is properly set. +pub type RegisteredFd = Registered; +pub type RegisteredBuf = Registered>; +pub type RegisteredBufRef<'a> = Registered<&'a [u8]>; +pub type RegisteredBufMut<'a> = Registered<&'a mut [u8]>; + +/// An object registered with an io-uring instance through a [`Registrar`](crate::Registrar). +#[derive(Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] +pub struct Registered { + data: T, + index: u32, +} + +impl Registered { + pub fn new(index: u32, data: T) -> Registered { + Registered { data, index } + } + + pub fn index(&self) -> u32 { + self.index + } + + pub fn into_inner(self) -> T { + self.data + } +} + +impl RegisteredFd { + pub fn is_placeholder(&self) -> bool { + self.data == PLACEHOLDER_FD + } +} + +impl AsRawFd for RegisteredFd { + fn as_raw_fd(&self) -> RawFd { + self.data + } +} + +impl RegisteredBuf { + pub fn as_ref(&self) -> RegisteredBufRef<'_> { + Registered::new(self.index, &self.data[..]) + } + + pub fn as_mut(&mut self) -> RegisteredBufMut<'_> { + Registered::new(self.index, &mut self.data[..]) + } + + pub fn slice(&self, range: Range) -> RegisteredBufRef<'_> { + Registered::new(self.index, &self.data[range]) + } + + pub fn slice_mut(&mut self, range: Range) -> RegisteredBufMut<'_> { + Registered::new(self.index, &mut self.data[range]) + } + + pub fn slice_to(&self, index: usize) -> RegisteredBufRef<'_> { + Registered::new(self.index, &self.data[..index]) + } + + pub fn slice_to_mut(&mut self, index: usize) -> RegisteredBufMut<'_> { + Registered::new(self.index, &mut self.data[..index]) + } + + pub fn slice_from(&self, index: usize) -> RegisteredBufRef<'_> { + Registered::new(self.index, &self.data[index..]) + } + + pub fn slice_from_mut(&mut self, index: usize) -> RegisteredBufMut<'_> { + Registered::new(self.index, &mut self.data[index..]) + } +} + +impl<'a> RegisteredBufRef<'a> { + pub fn as_ref(&self) -> RegisteredBufRef<'_> { + Registered::new(self.index, &self.data[..]) + } + + pub fn slice(self, range: Range) -> RegisteredBufRef<'a> { + Registered::new(self.index, &self.data[range]) + } + + pub fn slice_to(&self, index: usize) -> RegisteredBufRef<'_> { + Registered::new(self.index, &self.data[..index]) + } + + pub fn slice_from(&self, index: usize) -> RegisteredBufRef<'_> { + Registered::new(self.index, &self.data[index..]) + } +} + +impl<'a> RegisteredBufMut<'a> { + pub fn as_ref(&self) -> RegisteredBufRef<'_> { + Registered::new(self.index, &self.data[..]) + } + + pub fn as_mut(&mut self) -> RegisteredBufMut<'_> { + Registered::new(self.index, &mut self.data[..]) + } + + pub fn slice(self, range: Range) -> RegisteredBufRef<'a> { + Registered::new(self.index, &self.data[range]) + } + + pub fn slice_mut(self, range: Range) -> RegisteredBufMut<'a> { + Registered::new(self.index, &mut self.data[range]) + } + + pub fn slice_to(&self, index: usize) -> RegisteredBufRef<'_> { + Registered::new(self.index, &self.data[..index]) + } + + pub fn slice_to_mut(&mut self, index: usize) -> RegisteredBufMut<'_> { + Registered::new(self.index, &mut self.data[..index]) + } + + pub fn slice_from(&self, index: usize) -> RegisteredBufRef<'_> { + Registered::new(self.index, &self.data[index..]) + } + + pub fn slice_from_mut(&mut self, index: usize) -> RegisteredBufMut<'_> { + Registered::new(self.index, &mut self.data[index..]) + } +} + +impl Deref for RegisteredBuf { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + &self.data[..] + } +} + +impl Deref for RegisteredBufRef<'_> { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + &self.data[..] + } +} + +impl Deref for RegisteredBufMut<'_> { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + &self.data[..] + } +} + +impl DerefMut for RegisteredBuf { + fn deref_mut(&mut self) -> &mut [u8] { + &mut self.data[..] + } +} + +impl DerefMut for RegisteredBufMut<'_> { + fn deref_mut(&mut self) -> &mut [u8] { + &mut self.data[..] + } +} +/// A file descriptor that can be used to prepare SQEs. +/// +/// The standard library's [`RawFd`] type implements this trait, but so does [`RegisteredFd`], a +/// type which is returned when a user pre-registers file descriptors with an io-uring instance. +pub trait UringFd { + fn as_raw_fd(&self) -> RawFd; + fn update_sqe(&self, sqe: &mut SQE<'_>); +} + +impl UringFd for RawFd { + fn as_raw_fd(&self) -> RawFd { + *self + } + + fn update_sqe(&self, _: &mut SQE<'_>) {} +} + +impl UringFd for RegisteredFd { + fn as_raw_fd(&self) -> RawFd { + AsRawFd::as_raw_fd(self) + } + + fn update_sqe(&self, sqe: &mut SQE<'_>) { + unsafe { + sqe.raw_mut().fd = self.index as RawFd; + } + sqe.set_fixed_file(); + } +} + +/// A buffer that can be used to prepare read events. +pub trait UringReadBuf { + unsafe fn prep_read(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64); +} + +/// A buffer that can be used to prepare write events. +pub trait UringWriteBuf { + unsafe fn prep_write(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64); +} + +impl UringReadBuf for RegisteredBufMut<'_> { + unsafe fn prep_read(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64) { + uring_sys::io_uring_prep_read_fixed( + sqe.raw_mut(), + fd.as_raw_fd(), + self.data.as_mut_ptr() as _, + self.data.len() as _, + offset as _, + self.index() as _, + ); + fd.update_sqe(sqe); + } +} + +impl UringReadBuf for &'_ mut [u8] { + unsafe fn prep_read(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64) { + uring_sys::io_uring_prep_read( + sqe.raw_mut(), + fd.as_raw_fd(), + self.as_mut_ptr() as _, + self.len() as _, + offset as _, + ); + fd.update_sqe(sqe); + } +} + +impl UringReadBuf for io::IoSliceMut<'_> { + unsafe fn prep_read(mut self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64) { + uring_sys::io_uring_prep_read( + sqe.raw_mut(), + fd.as_raw_fd(), + self.as_mut_ptr() as _, + self.len() as _, + offset as _, + ); + fd.update_sqe(sqe); + } +} + +impl UringReadBuf for &'_ mut [&'_ mut [u8]] { + unsafe fn prep_read(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64) { + uring_sys::io_uring_prep_readv( + sqe.raw_mut(), + fd.as_raw_fd(), + self.as_mut_ptr() as _, + self.len() as _, + offset as _, + ); + fd.update_sqe(sqe); + } +} + +impl UringReadBuf for &'_ mut [io::IoSliceMut<'_>] { + unsafe fn prep_read(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64) { + uring_sys::io_uring_prep_readv( + sqe.raw_mut(), + fd.as_raw_fd(), + self.as_mut_ptr() as _, + self.len() as _, + offset as _, + ); + fd.update_sqe(sqe); + } +} + +impl UringWriteBuf for RegisteredBufRef<'_> { + unsafe fn prep_write(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64) { + uring_sys::io_uring_prep_write_fixed( + sqe.raw_mut(), + fd.as_raw_fd(), + self.data.as_ptr() as _, + self.data.len() as _, + offset as _, + self.index() as _, + ); + fd.update_sqe(sqe); + } +} + +impl UringWriteBuf for &'_ [u8] { + unsafe fn prep_write(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64) { + uring_sys::io_uring_prep_write( + sqe.raw_mut(), + fd.as_raw_fd(), + self.as_ptr() as _, + self.len() as _, + offset as _, + ); + fd.update_sqe(sqe); + } +} + +impl UringWriteBuf for io::IoSlice<'_> { + unsafe fn prep_write(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64) { + uring_sys::io_uring_prep_write( + sqe.raw_mut(), + fd.as_raw_fd(), + self.as_ptr() as _, + self.len() as _, + offset as _, + ); + fd.update_sqe(sqe); + } +} + +impl UringWriteBuf for &'_ [io::IoSlice<'_>] { + unsafe fn prep_write(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64) { + uring_sys::io_uring_prep_writev( + sqe.raw_mut(), + fd.as_raw_fd(), + self.as_ptr() as _, + self.len() as _, + offset as _, + ); + fd.update_sqe(sqe); + } +} + +impl UringWriteBuf for &'_ [&'_ [u8]] { + unsafe fn prep_write(self, fd: impl UringFd, sqe: &mut SQE<'_>, offset: u64) { + uring_sys::io_uring_prep_writev( + sqe.raw_mut(), + fd.as_raw_fd(), + self.as_ptr() as _, + self.len() as _, + offset as _, + ); + fd.update_sqe(sqe); + } +} diff --git a/glommio/src/iou/sqe.rs b/glommio/src/iou/sqe.rs new file mode 100644 index 000000000..9eb9e97e2 --- /dev/null +++ b/glommio/src/iou/sqe.rs @@ -0,0 +1,752 @@ +use std::ffi::CStr; +use std::io; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::os::unix::io::RawFd; +use std::ptr; +use std::slice; + +use super::registrar::{UringFd, UringReadBuf, UringWriteBuf}; + +pub use nix::fcntl::{FallocateFlags, OFlag, PosixFadviseAdvice}; +pub use nix::poll::PollFlags; +pub use nix::sys::epoll::{EpollEvent, EpollOp}; +pub use nix::sys::mman::MmapAdvise; +pub use nix::sys::socket::{MsgFlags, SockAddr, SockFlag}; +pub use nix::sys::stat::Mode; + +use super::Personality; +use crate::uring_sys; + +/// A pending IO event. +/// +/// Can be configured with a set of [`SubmissionFlags`](crate::sqe::SubmissionFlags). +/// +pub struct SQE<'a> { + sqe: &'a mut uring_sys::io_uring_sqe, +} + +impl<'a> SQE<'a> { + pub(crate) fn new(sqe: &'a mut uring_sys::io_uring_sqe) -> SQE<'a> { + SQE { sqe } + } + + /// Get this event's user data. + #[inline] + pub fn user_data(&self) -> u64 { + self.sqe.user_data as u64 + } + + /// Set this event's user data. User data is intended to be used by the application after + /// completion. + /// + /// Note that you should not set user_data to `u64::MAX`. This value is reserved for timeouts + /// generated by this library, setting an events user_data to that value will cause the + /// event's completion to swallowed by the library and you will never find out that the event + /// completed. + /// + /// # Safety + /// + /// This function is marked `unsafe`. The library from which you obtained this + /// `SQE` may impose additional safety invariants which you must adhere to + /// when setting the user_data for a submission queue event, which it may rely on when + /// processing the corresponding completion queue event. For example, the library + /// [ringbahn][ringbahn] + /// + /// # Example + /// + /// [ringbahn]: https://crates.io/crates/ringbahn + pub unsafe fn set_user_data(&mut self, user_data: u64) { + self.sqe.user_data = user_data as _; + } + + /// Get this event's flags. + #[inline] + pub fn flags(&self) -> SubmissionFlags { + unsafe { SubmissionFlags::from_bits_unchecked(self.sqe.flags as _) } + } + + /// Overwrite this event's flags. + pub fn overwrite_flags(&mut self, flags: SubmissionFlags) { + self.sqe.flags = flags.bits() as _; + } + + // must be called after any prep methods to properly complete mapped kernel IO + #[inline] + pub(crate) fn set_fixed_file(&mut self) { + self.set_flags(SubmissionFlags::FIXED_FILE); + } + + /// Set these flags for this event (any flags already set will still be set). + #[inline] + pub fn set_flags(&mut self, flags: SubmissionFlags) { + self.sqe.flags |= flags.bits(); + } + + /// Set the [`Personality`] associated with this submission. + #[inline] + pub fn set_personality(&mut self, personality: Personality) { + self.sqe.buf_index.buf_index.personality = personality.id; + } + + /// Prepare a read on a file descriptor. + /// + /// Both the file descriptor and the buffer can be pre-registered. See the + /// [`registrar][crate::registrar] module for more information. + #[inline] + pub unsafe fn prep_read(&mut self, fd: impl UringFd, buf: impl UringReadBuf, offset: u64) { + buf.prep_read(fd, self, offset); + } + + /// Prepare a vectored read on a file descriptor. + #[inline] + pub unsafe fn prep_read_vectored( + &mut self, + fd: impl UringFd, + bufs: &mut [io::IoSliceMut<'_>], + offset: u64, + ) { + let len = bufs.len(); + let addr = bufs.as_mut_ptr(); + uring_sys::io_uring_prep_readv(self.sqe, fd.as_raw_fd(), addr as _, len as _, offset as _); + fd.update_sqe(self); + } + + /// Prepare a read into a fixed, pre-registered buffer on a file descriptor. + #[inline] + pub unsafe fn prep_read_fixed( + &mut self, + fd: impl UringFd, + buf: &mut [u8], + offset: u64, + buf_index: u32, + ) { + let len = buf.len(); + let addr = buf.as_mut_ptr(); + uring_sys::io_uring_prep_read_fixed( + self.sqe, + fd.as_raw_fd(), + addr as _, + len as _, + offset as _, + buf_index as _, + ); + fd.update_sqe(self); + } + + /// Prepare a write on a file descriptor. + /// + /// Both the file descriptor and the buffer can be pre-registered. See the + /// [`registrar][crate::registrar] module for more information. + #[inline] + pub unsafe fn prep_write(&mut self, fd: impl UringFd, buf: impl UringWriteBuf, offset: u64) { + buf.prep_write(fd, self, offset) + } + + /// Prepare a vectored write on a file descriptor. + #[inline] + pub unsafe fn prep_write_vectored( + &mut self, + fd: impl UringFd, + bufs: &[io::IoSlice<'_>], + offset: u64, + ) { + let len = bufs.len(); + let addr = bufs.as_ptr(); + uring_sys::io_uring_prep_writev(self.sqe, fd.as_raw_fd(), addr as _, len as _, offset as _); + fd.update_sqe(self); + } + + /// Prepare a write on a file descriptor from a fixed, pre-registered buffer. + #[inline] + pub unsafe fn prep_write_fixed( + &mut self, + fd: impl UringFd, + buf: &[u8], + offset: u64, + buf_index: usize, + ) { + let len = buf.len(); + let addr = buf.as_ptr(); + uring_sys::io_uring_prep_write_fixed( + self.sqe, + fd.as_raw_fd(), + addr as _, + len as _, + offset as _, + buf_index as _, + ); + fd.update_sqe(self); + } + + /// Prepare an fsync on a file descriptor. + #[inline] + pub unsafe fn prep_fsync(&mut self, fd: impl UringFd, flags: FsyncFlags) { + uring_sys::io_uring_prep_fsync(self.sqe, fd.as_raw_fd(), flags.bits() as _); + fd.update_sqe(self); + } + + /// Prepare a splice, copying data from one file descriptor to another. + #[inline] + pub unsafe fn prep_splice( + &mut self, + fd_in: RawFd, + off_in: i64, + fd_out: RawFd, + off_out: i64, + count: u32, + flags: SpliceFlags, + ) { + uring_sys::io_uring_prep_splice( + self.sqe, + fd_in, + off_in, + fd_out, + off_out, + count, + flags.bits(), + ); + } + + /// Prepare a recv event on a file descriptor. + #[inline] + pub unsafe fn prep_recv(&mut self, fd: impl UringFd, buf: &mut [u8], flags: MsgFlags) { + let data = buf.as_mut_ptr() as *mut libc::c_void; + let len = buf.len(); + uring_sys::io_uring_prep_recv(self.sqe, fd.as_raw_fd(), data, len, flags.bits()); + fd.update_sqe(self); + } + + /// Prepare a send event on a file descriptor. + #[inline] + pub unsafe fn prep_send(&mut self, fd: impl UringFd, buf: &[u8], flags: MsgFlags) { + let data = buf.as_ptr() as *const libc::c_void as *mut libc::c_void; + let len = buf.len(); + uring_sys::io_uring_prep_send(self.sqe, fd.as_raw_fd(), data, len, flags.bits()); + fd.update_sqe(self); + } + + /// Prepare a recvmsg event on a file descriptor. + pub unsafe fn prep_recvmsg( + &mut self, + fd: impl UringFd, + msg: *mut libc::msghdr, + flags: MsgFlags, + ) { + uring_sys::io_uring_prep_recvmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _); + fd.update_sqe(self); + } + + /// Prepare a sendmsg event on a file descriptor. + pub unsafe fn prep_sendmsg( + &mut self, + fd: impl UringFd, + msg: *mut libc::msghdr, + flags: MsgFlags, + ) { + uring_sys::io_uring_prep_sendmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _); + fd.update_sqe(self); + } + + /// Prepare a fallocate event. + #[inline] + pub unsafe fn prep_fallocate( + &mut self, + fd: impl UringFd, + offset: u64, + size: u64, + flags: FallocateFlags, + ) { + uring_sys::io_uring_prep_fallocate( + self.sqe, + fd.as_raw_fd(), + flags.bits() as _, + offset as _, + size as _, + ); + fd.update_sqe(self); + } + + /// Prepare a statx event. + #[inline] + pub unsafe fn prep_statx( + &mut self, + dirfd: impl UringFd, + path: &CStr, + flags: StatxFlags, + mask: StatxMode, + buf: &mut libc::statx, + ) { + uring_sys::io_uring_prep_statx( + self.sqe, + dirfd.as_raw_fd(), + path.as_ptr() as _, + flags.bits() as _, + mask.bits() as _, + buf as _, + ); + } + + /// Prepare an openat event. + #[inline] + pub unsafe fn prep_openat(&mut self, fd: impl UringFd, path: &CStr, flags: OFlag, mode: Mode) { + uring_sys::io_uring_prep_openat( + self.sqe, + fd.as_raw_fd(), + path.as_ptr() as _, + flags.bits(), + mode.bits(), + ); + } + + // TODO openat2 + + /// Prepare a close event on a file descriptor. + #[inline] + pub unsafe fn prep_close(&mut self, fd: impl UringFd) { + uring_sys::io_uring_prep_close(self.sqe, fd.as_raw_fd()); + } + + /// Prepare a timeout event. + #[inline] + pub unsafe fn prep_timeout( + &mut self, + ts: &uring_sys::__kernel_timespec, + events: u32, + flags: TimeoutFlags, + ) { + uring_sys::io_uring_prep_timeout( + self.sqe, + ts as *const _ as *mut _, + events as _, + flags.bits() as _, + ); + } + + #[inline] + pub unsafe fn prep_timeout_remove(&mut self, user_data: u64) { + uring_sys::io_uring_prep_timeout_remove(self.sqe, user_data as _, 0); + } + + #[inline] + pub unsafe fn prep_link_timeout(&mut self, ts: &uring_sys::__kernel_timespec) { + uring_sys::io_uring_prep_link_timeout(self.sqe, ts as *const _ as *mut _, 0); + } + + #[inline] + pub unsafe fn prep_poll_add(&mut self, fd: impl UringFd, poll_flags: PollFlags) { + uring_sys::io_uring_prep_poll_add(self.sqe, fd.as_raw_fd(), poll_flags.bits()); + fd.update_sqe(self); + } + + #[inline] + pub unsafe fn prep_poll_remove(&mut self, user_data: u64) { + uring_sys::io_uring_prep_poll_remove(self.sqe, user_data as _) + } + + #[inline] + pub unsafe fn prep_connect(&mut self, fd: impl UringFd, socket_addr: &SockAddr) { + let (addr, len) = socket_addr.as_ffi_pair(); + uring_sys::io_uring_prep_connect(self.sqe, fd.as_raw_fd(), addr as *const _ as *mut _, len); + fd.update_sqe(self); + } + + #[inline] + pub unsafe fn prep_accept( + &mut self, + fd: impl UringFd, + accept: Option<&mut SockAddrStorage>, + flags: SockFlag, + ) { + let (addr, len) = match accept { + Some(accept) => ( + accept.storage.as_mut_ptr() as *mut _, + &mut accept.len as *mut _ as *mut _, + ), + None => (std::ptr::null_mut(), std::ptr::null_mut()), + }; + uring_sys::io_uring_prep_accept(self.sqe, fd.as_raw_fd(), addr, len, flags.bits()); + fd.update_sqe(self); + } + + #[inline] + pub unsafe fn prep_fadvise( + &mut self, + fd: impl UringFd, + off: u64, + len: u64, + advice: PosixFadviseAdvice, + ) { + use PosixFadviseAdvice::*; + let advice = match advice { + POSIX_FADV_NORMAL => libc::POSIX_FADV_NORMAL, + POSIX_FADV_SEQUENTIAL => libc::POSIX_FADV_SEQUENTIAL, + POSIX_FADV_RANDOM => libc::POSIX_FADV_RANDOM, + POSIX_FADV_NOREUSE => libc::POSIX_FADV_NOREUSE, + POSIX_FADV_WILLNEED => libc::POSIX_FADV_WILLNEED, + POSIX_FADV_DONTNEED => libc::POSIX_FADV_DONTNEED, + }; + uring_sys::io_uring_prep_fadvise(self.sqe, fd.as_raw_fd(), off as _, len as _, advice); + fd.update_sqe(self); + } + + #[inline] + pub unsafe fn prep_madvise(&mut self, data: &mut [u8], advice: MmapAdvise) { + use MmapAdvise::*; + let advice = match advice { + MADV_NORMAL => libc::MADV_NORMAL, + MADV_RANDOM => libc::MADV_RANDOM, + MADV_SEQUENTIAL => libc::MADV_SEQUENTIAL, + MADV_WILLNEED => libc::MADV_WILLNEED, + MADV_DONTNEED => libc::MADV_DONTNEED, + MADV_REMOVE => libc::MADV_REMOVE, + MADV_DONTFORK => libc::MADV_DONTFORK, + MADV_DOFORK => libc::MADV_DOFORK, + MADV_HWPOISON => libc::MADV_HWPOISON, + MADV_MERGEABLE => libc::MADV_MERGEABLE, + MADV_UNMERGEABLE => libc::MADV_UNMERGEABLE, + MADV_SOFT_OFFLINE => libc::MADV_SOFT_OFFLINE, + MADV_HUGEPAGE => libc::MADV_HUGEPAGE, + MADV_NOHUGEPAGE => libc::MADV_NOHUGEPAGE, + MADV_DONTDUMP => libc::MADV_DONTDUMP, + MADV_DODUMP => libc::MADV_DODUMP, + MADV_FREE => libc::MADV_FREE, + }; + uring_sys::io_uring_prep_madvise( + self.sqe, + data.as_mut_ptr() as *mut _, + data.len() as _, + advice, + ); + } + + #[inline] + pub unsafe fn prep_epoll_ctl( + &mut self, + epoll_fd: RawFd, + op: EpollOp, + fd: RawFd, + event: Option<&mut EpollEvent>, + ) { + let op = match op { + EpollOp::EpollCtlAdd => libc::EPOLL_CTL_ADD, + EpollOp::EpollCtlDel => libc::EPOLL_CTL_DEL, + EpollOp::EpollCtlMod => libc::EPOLL_CTL_MOD, + }; + let event = event.map_or(ptr::null_mut(), |event| event as *mut EpollEvent as *mut _); + uring_sys::io_uring_prep_epoll_ctl(self.sqe, epoll_fd, fd, op, event); + } + + #[inline] + pub unsafe fn prep_files_update(&mut self, files: &[RawFd], offset: u32) { + let addr = files.as_ptr() as *mut RawFd; + let len = files.len() as u32; + uring_sys::io_uring_prep_files_update(self.sqe, addr, len, offset as _); + } + + pub unsafe fn prep_provide_buffers( + &mut self, + buffers: &mut [u8], + count: u32, + group: BufferGroupId, + index: u32, + ) { + let addr = buffers.as_mut_ptr() as *mut libc::c_void; + let len = buffers.len() as u32 / count; + uring_sys::io_uring_prep_provide_buffers( + self.sqe, + addr, + len as _, + count as _, + group.id as _, + index as _, + ); + } + + pub unsafe fn prep_remove_buffers(&mut self, count: u32, id: BufferGroupId) { + uring_sys::io_uring_prep_remove_buffers(self.sqe, count as _, id.id as _); + } + + #[inline] + pub unsafe fn prep_cancel(&mut self, user_data: u64, flags: i32) { + uring_sys::io_uring_prep_cancel(self.sqe, user_data as _, flags); + } + + /// Prepare a no-op event. + #[inline] + pub unsafe fn prep_nop(&mut self) { + uring_sys::io_uring_prep_nop(self.sqe); + } + + /// Clear event. Clears user data, flags, and any event setup. + pub fn clear(&mut self) { + *self.sqe = unsafe { mem::zeroed() }; + } + + /// Get a reference to the underlying [`uring_sys::io_uring_sqe`](uring_sys::io_uring_sqe) object. + /// + /// You can use this method to inspect the low-level details of an event. + pub fn raw(&self) -> &uring_sys::io_uring_sqe { + &self.sqe + } + + pub unsafe fn raw_mut(&mut self) -> &mut uring_sys::io_uring_sqe { + &mut self.sqe + } +} + +unsafe impl<'a> Send for SQE<'a> {} +unsafe impl<'a> Sync for SQE<'a> {} + +#[derive(Debug)] +pub struct SockAddrStorage { + storage: mem::MaybeUninit, + len: usize, +} + +impl SockAddrStorage { + pub fn uninit() -> Self { + let storage = mem::MaybeUninit::uninit(); + let len = mem::size_of::(); + SockAddrStorage { storage, len } + } + + pub unsafe fn as_socket_addr(&self) -> io::Result { + let storage = &*self.storage.as_ptr(); + nix::sys::socket::sockaddr_storage_to_addr(storage, self.len).map_err(|e| { + let err_no = e.as_errno(); + match err_no { + Some(err_no) => io::Error::from_raw_os_error(err_no as _), + None => io::Error::new(io::ErrorKind::Other, "Unknown error"), + } + }) + } +} + +#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct BufferGroupId { + pub id: u32, +} + +bitflags::bitflags! { + /// [`SQE`](SQE) configuration flags. + pub struct SubmissionFlags: u8 { + /// This event's file descriptor is an index into the preregistered set of files. + const FIXED_FILE = 1 << 0; /* use fixed fileset */ + /// Submit this event only after completing all ongoing submission events. + const IO_DRAIN = 1 << 1; /* issue after inflight IO */ + /// Force the next submission event to wait until this event has completed sucessfully. + /// + /// An event's link only applies to the next event, but link chains can be + /// arbitrarily long. + const IO_LINK = 1 << 2; /* next IO depends on this one */ + + const IO_HARDLINK = 1 << 3; + const ASYNC = 1 << 4; + const BUFFER_SELECT = 1 << 5; + } +} + +bitflags::bitflags! { + pub struct FsyncFlags: u32 { + /// Sync file data without an immediate metadata sync. + const FSYNC_DATASYNC = 1 << 0; + } +} + +bitflags::bitflags! { + pub struct StatxFlags: i32 { + const AT_STATX_SYNC_AS_STAT = 0; + const AT_SYMLINK_NOFOLLOW = 1 << 10; + const AT_NO_AUTOMOUNT = 1 << 11; + const AT_EMPTY_PATH = 1 << 12; + const AT_STATX_FORCE_SYNC = 1 << 13; + const AT_STATX_DONT_SYNC = 1 << 14; + } +} + +bitflags::bitflags! { + pub struct StatxMode: i32 { + const STATX_TYPE = 1 << 0; + const STATX_MODE = 1 << 1; + const STATX_NLINK = 1 << 2; + const STATX_UID = 1 << 3; + const STATX_GID = 1 << 4; + const STATX_ATIME = 1 << 5; + const STATX_MTIME = 1 << 6; + const STATX_CTIME = 1 << 7; + const STATX_INO = 1 << 8; + const STATX_SIZE = 1 << 9; + const STATX_BLOCKS = 1 << 10; + const STATX_BTIME = 1 << 11; + } +} + +bitflags::bitflags! { + pub struct TimeoutFlags: u32 { + const TIMEOUT_ABS = 1 << 0; + } +} + +bitflags::bitflags! { + pub struct SpliceFlags: u32 { + const F_FD_IN_FIXED = 1 << 31; + } +} + +/// A sequence of [`SQE`]s from the [`SubmissionQueue`][crate::SubmissionQueue]. +pub struct SQEs<'ring> { + sqes: slice::IterMut<'ring, uring_sys::io_uring_sqe>, +} + +impl<'ring> SQEs<'ring> { + pub(crate) fn new(slice: &'ring mut [uring_sys::io_uring_sqe]) -> SQEs<'ring> { + SQEs { + sqes: slice.iter_mut(), + } + } + + /// Consumes all remaining [`SQE`]s, returning the last one. Subsequent attempts to get + /// additional [`SQE`]s will return `None`. + pub fn single(&mut self) -> Option> { + let mut next = None; + while let Some(sqe) = self.consume() { + next = Some(sqe) + } + next + } + + /// An iterator of [`HardLinkedSQE`]s. These will be [`SQE`]s that are *hard-linked* together. + /// + /// Hard-linked SQEs will occur sequentially. All of them will be completed, even if one of the + /// events resolves to an error. + pub fn hard_linked(&mut self) -> HardLinked<'ring, '_> { + HardLinked { sqes: self } + } + + /// An iterator of [`SoftLinkedSQE`]s. These will be [`SQE`]s that are *soft-linked* together. + /// + /// Soft-linked SQEs will occur sequentially. If one the events errors, all events after it + /// will be cancelled. + pub fn soft_linked(&mut self) -> SoftLinked<'ring, '_> { + SoftLinked { sqes: self } + } + + /// Remaining [`SQE`]s that can be modified. + pub fn remaining(&self) -> u32 { + self.sqes.len() as u32 + } + + fn consume(&mut self) -> Option> { + self.sqes.next().map(|sqe| { + unsafe { uring_sys::io_uring_prep_nop(sqe) } + SQE { sqe } + }) + } +} + +impl<'ring> Iterator for SQEs<'ring> { + type Item = SQE<'ring>; + + fn next(&mut self) -> Option> { + self.consume() + } +} + +/// An Iterator of [`SQE`]s which will be hard linked together. +pub struct HardLinked<'ring, 'a> { + sqes: &'a mut SQEs<'ring>, +} + +impl<'ring> HardLinked<'ring, '_> { + pub fn terminate(self) -> Option> { + self.sqes.consume() + } +} + +impl<'ring> Iterator for HardLinked<'ring, '_> { + type Item = HardLinkedSQE<'ring>; + + fn next(&mut self) -> Option { + let is_final = self.sqes.remaining() == 1; + self.sqes + .consume() + .map(|sqe| HardLinkedSQE { sqe, is_final }) + } +} + +pub struct HardLinkedSQE<'ring> { + sqe: SQE<'ring>, + is_final: bool, +} + +impl<'ring> Deref for HardLinkedSQE<'ring> { + type Target = SQE<'ring>; + + fn deref(&self) -> &SQE<'ring> { + &self.sqe + } +} + +impl<'ring> DerefMut for HardLinkedSQE<'ring> { + fn deref_mut(&mut self) -> &mut SQE<'ring> { + &mut self.sqe + } +} + +impl<'ring> Drop for HardLinkedSQE<'ring> { + fn drop(&mut self) { + if !self.is_final { + self.sqe.set_flags(SubmissionFlags::IO_HARDLINK); + } + } +} + +/// An Iterator of [`SQE`]s which will be soft linked together. +pub struct SoftLinked<'ring, 'a> { + sqes: &'a mut SQEs<'ring>, +} + +impl<'ring> SoftLinked<'ring, '_> { + pub fn terminate(self) -> Option> { + self.sqes.consume() + } +} + +impl<'ring> Iterator for SoftLinked<'ring, '_> { + type Item = SoftLinkedSQE<'ring>; + + fn next(&mut self) -> Option { + let is_final = self.sqes.remaining() == 1; + self.sqes + .consume() + .map(|sqe| SoftLinkedSQE { sqe, is_final }) + } +} + +pub struct SoftLinkedSQE<'ring> { + sqe: SQE<'ring>, + is_final: bool, +} + +impl<'ring> Deref for SoftLinkedSQE<'ring> { + type Target = SQE<'ring>; + + fn deref(&self) -> &SQE<'ring> { + &self.sqe + } +} + +impl<'ring> DerefMut for SoftLinkedSQE<'ring> { + fn deref_mut(&mut self) -> &mut SQE<'ring> { + &mut self.sqe + } +} + +impl<'ring> Drop for SoftLinkedSQE<'ring> { + fn drop(&mut self) { + if !self.is_final { + self.sqe.set_flags(SubmissionFlags::IO_LINK); + } + } +} diff --git a/glommio/src/iou/submission_queue.rs b/glommio/src/iou/submission_queue.rs new file mode 100644 index 000000000..514b0da83 --- /dev/null +++ b/glommio/src/iou/submission_queue.rs @@ -0,0 +1,133 @@ +use std::fmt; +use std::io; +use std::marker::PhantomData; +use std::ptr::NonNull; +use std::slice; +use std::sync::atomic::{self, Ordering}; +use std::time::Duration; + +use super::{resultify, IoUring, SQEs, SQE}; +use crate::uring_sys; + +/// The queue of pending IO events. +/// +/// Each element is a [`SQE`](crate::sqe::SQE). +/// By default, events are processed in parallel after being submitted. +/// You can modify this behavior for specific events using event [`SubmissionFlags`](crate::sqe::SubmissionFlags). +/// +/// # Examples +/// Consider a read event that depends on a successful write beforehand. +/// +/// We reify this relationship by using `IO_LINK` to link these events. +pub struct SubmissionQueue<'ring> { + ring: NonNull, + _marker: PhantomData<&'ring mut IoUring>, +} + +impl<'ring> SubmissionQueue<'ring> { + pub(crate) fn new(ring: &'ring IoUring) -> SubmissionQueue<'ring> { + SubmissionQueue { + ring: NonNull::from(&ring.ring), + _marker: PhantomData, + } + } + + /// Returns new [`SQE`s](crate::sqe::SQE) until the queue size is reached. After that, will return `None`. + pub fn prepare_sqe(&mut self) -> Option> { + unsafe { prepare_sqe(self.ring.as_mut()) } + } + + pub fn prepare_sqes(&mut self, count: u32) -> Option> { + unsafe { + let sq: &mut uring_sys::io_uring_sq = &mut (*self.ring.as_ptr()).sq; + prepare_sqes(sq, count) + } + } + + /// Submit all events in the queue. Returns the number of submitted events. + /// + /// If this function encounters any IO errors an [`io::Error`](std::io::Result) variant is returned. + pub fn submit(&mut self) -> io::Result { + resultify(unsafe { uring_sys::io_uring_submit(self.ring.as_ptr()) }) + } + + pub fn submit_and_wait(&mut self, wait_for: u32) -> io::Result { + resultify(unsafe { uring_sys::io_uring_submit_and_wait(self.ring.as_ptr(), wait_for as _) }) + } + + pub fn submit_and_wait_with_timeout( + &mut self, + wait_for: u32, + duration: Duration, + ) -> io::Result { + let ts = uring_sys::__kernel_timespec { + tv_sec: duration.as_secs() as _, + tv_nsec: duration.subsec_nanos() as _, + }; + + loop { + if let Some(mut sqe) = self.prepare_sqe() { + sqe.clear(); + unsafe { + sqe.prep_timeout(&ts, 0, crate::iou::sqe::TimeoutFlags::empty()); + sqe.set_user_data(uring_sys::LIBURING_UDATA_TIMEOUT); + return resultify(uring_sys::io_uring_submit_and_wait( + self.ring.as_ptr(), + wait_for as _, + )); + } + } + + self.submit()?; + } + } + + pub fn ready(&self) -> u32 { + unsafe { uring_sys::io_uring_sq_ready(self.ring.as_ptr()) as u32 } + } + + pub fn space_left(&self) -> u32 { + unsafe { uring_sys::io_uring_sq_space_left(self.ring.as_ptr()) as u32 } + } +} + +impl fmt::Debug for SubmissionQueue<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let fd = unsafe { self.ring.as_ref().ring_fd }; + f.debug_struct(std::any::type_name::()) + .field("fd", &fd) + .finish() + } +} + +unsafe impl<'ring> Send for SubmissionQueue<'ring> {} +unsafe impl<'ring> Sync for SubmissionQueue<'ring> {} + +pub(crate) unsafe fn prepare_sqe<'a>(ring: &mut uring_sys::io_uring) -> Option> { + let sqe = uring_sys::io_uring_get_sqe(ring); + if !sqe.is_null() { + let mut sqe = SQE::new(&mut *sqe); + sqe.clear(); + Some(sqe) + } else { + None + } +} + +pub(crate) unsafe fn prepare_sqes<'a>( + sq: &mut uring_sys::io_uring_sq, + count: u32, +) -> Option> { + atomic::fence(Ordering::Acquire); + + let head: u32 = *sq.khead; + let next: u32 = sq.sqe_tail + count; + + if next - head <= *sq.kring_entries { + let sqe = sq.sqes.offset((sq.sqe_tail & *sq.kring_mask) as isize); + sq.sqe_tail = next; + Some(SQEs::new(slice::from_raw_parts_mut(sqe, count as usize))) + } else { + None + } +} diff --git a/glommio/src/lib.rs b/glommio/src/lib.rs index 26420fcee..e5d4681b8 100644 --- a/glommio/src/lib.rs +++ b/glommio/src/lib.rs @@ -266,9 +266,13 @@ use std::fmt::Debug; use std::time::Duration; mod free_list; +#[allow(dead_code)] +mod iou; mod parking; mod sys; pub mod task; +#[allow(dead_code)] +mod uring_sys; #[cfg(feature = "bench")] pub mod nop; diff --git a/glommio/src/parking.rs b/glommio/src/parking.rs index 5a47c0074..dcb3aa740 100644 --- a/glommio/src/parking.rs +++ b/glommio/src/parking.rs @@ -22,8 +22,8 @@ //! no thread context switch is necessary when going between task execution and I/O. //! +use crate::iou::sqe::SockAddrStorage; use ahash::AHashMap; -use iou::sqe::SockAddrStorage; use nix::sys::socket::{MsgFlags, SockAddr}; use std::cell::{Cell, RefCell}; use std::collections::{BTreeMap, VecDeque}; diff --git a/glommio/src/sys/mod.rs b/glommio/src/sys/mod.rs index ea84ea782..6e8408ec6 100644 --- a/glommio/src/sys/mod.rs +++ b/glommio/src/sys/mod.rs @@ -3,8 +3,9 @@ // // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc. // +use crate::iou::sqe::SockAddrStorage; +use crate::uring_sys; use ahash::AHashMap; -use iou::sqe::SockAddrStorage; use nix::sys::socket::SockAddr; use std::cell::{Cell, RefCell}; use std::convert::TryFrom; diff --git a/glommio/src/sys/uring.rs b/glommio/src/sys/uring.rs index 51f942031..f79d150b6 100644 --- a/glommio/src/sys/uring.rs +++ b/glommio/src/sys/uring.rs @@ -21,18 +21,20 @@ use std::task::Waker; use std::time::Duration; use crate::free_list::{FreeList, Idx}; +use crate::iou; +use crate::iou::sqe::{FsyncFlags, SockAddrStorage, StatxFlags, StatxMode, TimeoutFlags}; use crate::sys::dma_buffer::{BufferStorage, DmaBuffer}; use crate::sys::{ self, DirectIO, IOBuffer, InnerSource, LinkStatus, PollableStatus, Source, SourceType, }; +use crate::uring_sys; use crate::{IoRequirements, Latency}; use buddy_alloc::buddy_alloc::{BuddyAlloc, BuddyAllocParam}; -use iou::sqe::{FsyncFlags, SockAddrStorage, StatxFlags, StatxMode, TimeoutFlags}; use nix::sys::socket::{MsgFlags, SockAddr, SockFlag}; use nix::sys::stat::Mode as OpenMode; use std::sync::Arc; -use uring_sys::IoRingOp; +use crate::uring_sys::IoRingOp; use super::{EnqueuedSource, TimeSpec64}; @@ -339,7 +341,7 @@ where UringOpDescriptor::WriteFixed(ptr, len, pos, buf_index) => { let buf = std::slice::from_raw_parts(ptr, len); - sqe.prep_write_fixed(op.fd, buf, pos, buf_index); + sqe.prep_write_fixed(op.fd, buf, pos, buf_index as _); } UringOpDescriptor::SockSend(ptr, len, flags) => { diff --git a/glommio/src/uring_sys/mod.rs b/glommio/src/uring_sys/mod.rs new file mode 100644 index 000000000..f54caabd8 --- /dev/null +++ b/glommio/src/uring_sys/mod.rs @@ -0,0 +1,686 @@ +pub mod syscalls; + +pub const LIBURING_UDATA_TIMEOUT: libc::__u64 = libc::__u64::max_value(); + +// sqe opcode constants +#[repr(C)] +#[non_exhaustive] +#[allow(nonstandard_style)] +#[derive(Debug)] +pub enum IoRingOp { + IORING_OP_NOP, + IORING_OP_READV, + IORING_OP_WRITEV, + IORING_OP_FSYNC, + IORING_OP_READ_FIXED, + IORING_OP_WRITE_FIXED, + IORING_OP_POLL_ADD, + IORING_OP_POLL_REMOVE, + IORING_OP_SYNC_FILE_RANGE, + IORING_OP_SENDMSG, + IORING_OP_RECVMSG, + IORING_OP_TIMEOUT, + IORING_OP_TIMEOUT_REMOVE, + IORING_OP_ACCEPT, + IORING_OP_ASYNC_CANCEL, + IORING_OP_LINK_TIMEOUT, + IORING_OP_CONNECT, + IORING_OP_FALLOCATE, + IORING_OP_OPENAT, + IORING_OP_CLOSE, + IORING_OP_FILES_UPDATE, + IORING_OP_STATX, + IORING_OP_READ, + IORING_OP_WRITE, + IORING_OP_FADVISE, + IORING_OP_MADVISE, + IORING_OP_SEND, + IORING_OP_RECV, + IORING_OP_OPENAT2, + IORING_OP_EPOLL_CTL, + IORING_OP_SPLICE, + IORING_OP_PROVIDE_BUFFERS, + IORING_OP_REMOVE_BUFFERS, + IORING_OP_TEE, +} + +// sqe.flags +pub const IOSQE_FIXED_FILE: libc::__u8 = 1 << 0; /* use fixed fileset */ +pub const IOSQE_IO_DRAIN: libc::__u8 = 1 << 1; /* issue after inflight IO */ +pub const IOSQE_IO_LINK: libc::__u8 = 1 << 2; /* links next sqe */ +pub const IOSQE_IO_HARDLINK: libc::__u8 = 1 << 3; /* like LINK, but stronger */ +pub const IOSQE_ASYNC: libc::__u8 = 1 << 4; /* always go async */ +pub const IOSQE_BUFFER_SELECT: libc::__u8 = 1 << 5; /* select buf from sqe->buf_group */ + +// sqe.cmd_flags.fsync_flags +pub const IORING_FSYNC_DATASYNC: libc::__u32 = 1 << 0; + +// sqe.cmd_flags.timeout_flags +pub const IORING_TIMEOUT_ABS: libc::__u32 = 1 << 0; + +// sqe.cmd_flags.splice_flags +pub const SPLICE_F_FD_IN_FIXED: libc::__u32 = 1 << 31; + +// io_uring_setup flags +pub const IORING_SETUP_IOPOLL: libc::c_uint = 1 << 0; /* io_context is polled */ +pub const IORING_SETUP_SQPOLL: libc::c_uint = 1 << 1; /* SQ poll thread */ +pub const IORING_SETUP_SQ_AFF: libc::c_uint = 1 << 2; /* sq_thread_cpu is valid */ +pub const IORING_SETUP_CQSIZE: libc::c_uint = 1 << 3; /* app defines CQ size */ +pub const IORING_SETUP_CLAMP: libc::c_uint = 1 << 4; /* clamp SQ/CQ ring sizes */ +pub const IORING_SETUP_ATTACH_WQ: libc::c_uint = 1 << 5; /* attach to existing wq */ + +// cqe.flags +pub const IORING_CQE_BUFFER_SHIFT: libc::c_uint = 1 << 0; + +// Magic offsets for the application to mmap the data it needs +pub const IORING_OFF_SQ_RING: libc::__u64 = 0; +pub const IORING_OFF_CQ_RING: libc::__u64 = 0x8000000; +pub const IORING_OFF_SQES: libc::__u64 = 0x10000000; + +// sq_ring.kflags +pub const IORING_SQ_NEED_WAKEUP: libc::c_uint = 1 << 0; +pub const IORING_SQ_CQ_OVERFLOW: libc::c_uint = 1 << 1; + +// cq_ring.kflags +pub const IORING_CQ_EVENTFD_DISABLED: libc::c_uint = 1 << 0; + +// io_uring_enter flags +pub const IORING_ENTER_GETEVENTS: libc::c_uint = 1 << 0; +pub const IORING_ENTER_SQ_WAKEUP: libc::c_uint = 1 << 1; + +// io_uring_params.features flags +pub const IORING_FEAT_SINGLE_MMAP: libc::__u32 = 1 << 0; +pub const IORING_FEAT_NODROP: libc::__u32 = 1 << 1; +pub const IORING_FEAT_SUBMIT_STABLE: libc::__u32 = 1 << 2; +pub const IORING_FEAT_RW_CUR_POS: libc::__u32 = 1 << 3; +pub const IORING_FEAT_CUR_PERSONALITY: libc::__u32 = 1 << 4; +pub const IORING_FEAT_FAST_POLL: libc::__u32 = 1 << 5; +pub const IORING_FEAT_POLL_32BITS: libc::__u32 = 1 << 6; + +// io_uring_register opcodes and arguments +pub const IORING_REGISTER_BUFFERS: libc::c_uint = 0; +pub const IORING_UNREGISTER_BUFFERS: libc::c_uint = 1; +pub const IORING_REGISTER_FILES: libc::c_uint = 2; +pub const IORING_UNREGISTER_FILES: libc::c_uint = 3; +pub const IORING_REGISTER_EVENTFD: libc::c_uint = 4; +pub const IORING_UNREGISTER_EVENTFD: libc::c_uint = 5; +pub const IORING_REGISTER_FILES_UPDATE: libc::c_uint = 6; +pub const IORING_REGISTER_EVENTFD_ASYNC: libc::c_uint = 7; +pub const IORING_REGISTER_PROBE: libc::c_uint = 8; +pub const IORING_REGISTER_PERSONALITY: libc::c_uint = 9; +pub const IORING_UNREGISTER_PERSONALITY: libc::c_uint = 10; + +#[derive(Debug)] +#[repr(C)] +pub struct io_uring { + pub sq: io_uring_sq, + pub cq: io_uring_cq, + pub flags: libc::c_uint, + pub ring_fd: libc::c_int, + + pub features: libc::c_uint, + pub pad: [libc::c_uint; 3], +} + +#[derive(Debug)] +#[repr(C)] +pub struct io_uring_sq { + pub khead: *mut libc::c_uint, + pub ktail: *mut libc::c_uint, + pub kring_mask: *mut libc::c_uint, + pub kring_entries: *mut libc::c_uint, + pub kflags: *mut libc::c_uint, + pub kdropped: *mut libc::c_uint, + pub array: *mut libc::c_uint, + pub sqes: *mut io_uring_sqe, + + pub sqe_head: libc::c_uint, + pub sqe_tail: libc::c_uint, + + pub ring_sz: libc::size_t, + pub ring_ptr: *mut libc::c_void, + + pub pad: [libc::c_uint; 4], +} + +#[derive(Debug)] +#[repr(C)] +pub struct io_uring_cq { + pub khead: *mut libc::c_uint, + pub ktail: *mut libc::c_uint, + pub kring_mask: *mut libc::c_uint, + pub kring_entries: *mut libc::c_uint, + pub kflags: *mut libc::c_uint, + pub koverflow: *mut libc::c_uint, + pub cqes: *mut io_uring_cqe, + + pub ring_sz: libc::size_t, + pub ring_ptr: *mut libc::c_void, + + pub pad: [libc::c_uint; 4], +} + +#[repr(C)] +pub struct io_uring_sqe { + pub opcode: libc::__u8, /* type of operation for this sqe */ + pub flags: libc::__u8, /* IOSQE_ flags */ + pub ioprio: libc::__u16, /* ioprio for the request */ + pub fd: libc::__s32, /* file descriptor to do IO on */ + pub off_addr2: off_addr2, + pub addr: libc::__u64, /* pointer to buffer or iovecs */ + pub len: libc::__u32, /* buffer size or number of iovecs */ + pub cmd_flags: cmd_flags, + pub user_data: libc::__u64, /* data to be passed back at completion time */ + pub buf_index: buf_index_padding, /* index into fixed buffers, if used */ +} + +#[repr(C)] +pub union off_addr2 { + pub off: libc::__u64, + pub addr2: libc::__u64, +} + +#[repr(C)] +pub union cmd_flags { + pub rw_flags: __kernel_rwf_t, + pub fsync_flags: libc::__u32, + pub poll_events: libc::__u16, + pub sync_range_flags: libc::__u32, + pub msg_flags: libc::__u32, + pub timeout_flags: libc::__u32, + pub accept_flags: libc::__u32, + pub cancel_flags: libc::__u32, + pub open_flags: libc::__u32, + pub statx_flags: libc::__u32, + pub fadvise_advice: libc::__u32, + pub splice_flags: libc::__u32, +} + +#[allow(non_camel_case_types)] +type __kernel_rwf_t = libc::c_int; + +#[repr(C)] +pub union buf_index_padding { + pub buf_index: buf_index, + pub __pad2: [libc::__u64; 3], +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub struct buf_index { + pub index_or_group: libc::__u16, + pub personality: libc::__u16, + pub splice_fd_in: libc::__s32, +} + +#[repr(C)] +pub struct io_uring_cqe { + pub user_data: libc::__u64, /* sqe->data submission passed back */ + pub res: libc::__s32, /* result code for this event */ + pub flags: libc::__u32, +} + +#[repr(C)] +pub struct io_uring_params { + pub sq_entries: libc::__u32, + pub cq_entries: libc::__u32, + pub flags: libc::__u32, + pub sq_thread_cpu: libc::__u32, + pub sq_thread_idle: libc::__u32, + pub features: libc::__u32, + pub wq_fd: libc::__u32, + pub resv: [libc::__u32; 3], + pub sq_off: io_sqring_offsets, + pub cq_off: io_cqring_offsets, +} + +#[repr(C)] +pub struct io_sqring_offsets { + pub head: libc::__u32, + pub tail: libc::__u32, + pub ring_mask: libc::__u32, + pub ring_entries: libc::__u32, + pub flags: libc::__u32, + pub dropped: libc::__u32, + pub array: libc::__u32, + pub resv1: libc::__u32, + pub resv2: libc::__u64, +} + +#[repr(C)] +pub struct io_cqring_offsets { + pub head: libc::__u32, + pub tail: libc::__u32, + pub ring_mask: libc::__u32, + pub ring_entries: libc::__u32, + pub overflow: libc::__u32, + pub cqes: libc::__u32, + pub resv: [libc::__u64; 2], +} + +#[repr(C)] +pub struct io_uring_probe { + last_op: libc::__u8, + ops_len: libc::__u8, + resv: libc::__u16, + resv2: [libc::__u32; 3], + ops: [io_uring_probe_op; 0], +} + +#[repr(C)] +pub struct io_uring_probe_op { + op: libc::__u8, + resv: libc::__u8, + flags: libc::__u16, + resv2: libc::__u32, +} + +#[repr(C)] +pub struct __kernel_timespec { + pub tv_sec: i64, + pub tv_nsec: libc::c_longlong, +} + +#[link(name = "uring")] +extern "C" { + pub fn io_uring_queue_init( + entries: libc::c_uint, + ring: *mut io_uring, + flags: libc::c_uint, + ) -> libc::c_int; + + pub fn io_uring_queue_init_params( + entries: libc::c_uint, + ring: *mut io_uring, + params: *mut io_uring_params, + ) -> libc::c_int; + + pub fn io_uring_queue_mmap( + fd: libc::c_int, + params: *mut io_uring_params, + ring: *mut io_uring, + ) -> libc::c_int; + + pub fn io_uring_get_probe_ring(ring: *mut io_uring) -> *mut io_uring_probe; + + pub fn io_uring_get_probe() -> *mut io_uring_probe; + + pub fn io_uring_free_probe(probe: *mut io_uring_probe); + + pub fn io_uring_dontfork(ring: *mut io_uring) -> libc::c_int; + + pub fn io_uring_queue_exit(ring: *mut io_uring); + + pub fn io_uring_peek_batch_cqe( + ring: *mut io_uring, + cqes: *mut *mut io_uring_cqe, + count: libc::c_uint, + ) -> libc::c_uint; + + pub fn io_uring_wait_cqes( + ring: *mut io_uring, + cqe_ptr: *mut *mut io_uring_cqe, + wait_nr: libc::c_uint, + ts: *const __kernel_timespec, + sigmask: *const libc::sigset_t, + ) -> libc::c_int; + + pub fn io_uring_wait_cqe_timeout( + ring: *mut io_uring, + cqe_ptr: *mut *mut io_uring_cqe, + ts: *mut __kernel_timespec, + ) -> libc::c_int; + + pub fn io_uring_submit(ring: *mut io_uring) -> libc::c_int; + + pub fn io_uring_submit_and_wait(ring: *mut io_uring, wait_nr: libc::c_uint) -> libc::c_int; + + pub fn io_uring_get_sqe(ring: *mut io_uring) -> *mut io_uring_sqe; + + pub fn io_uring_register_buffers( + ring: *mut io_uring, + iovecs: *const libc::iovec, + nr_iovecs: libc::c_uint, + ) -> libc::c_int; + + pub fn io_uring_unregister_buffers(ring: *mut io_uring) -> libc::c_int; + + pub fn io_uring_register_files( + ring: *mut io_uring, + files: *const libc::c_int, + nr_files: libc::c_uint, + ) -> libc::c_int; + + pub fn io_uring_unregister_files(ring: *mut io_uring) -> libc::c_int; + + pub fn io_uring_register_files_update( + ring: *mut io_uring, + off: libc::c_uint, + files: *const libc::c_int, + nr_files: libc::c_uint, + ) -> libc::c_int; + + pub fn io_uring_register_eventfd(ring: *mut io_uring, fd: libc::c_int) -> libc::c_int; + + pub fn io_uring_register_eventfd_async(ring: *mut io_uring, fd: libc::c_int) -> libc::c_int; + + pub fn io_uring_unregister_eventfd(ring: *mut io_uring) -> libc::c_int; + + pub fn io_uring_register_probe( + ring: *mut io_uring, + p: *mut io_uring_probe, + nr: libc::c_uint, + ) -> libc::c_int; + + pub fn io_uring_register_personality(ring: *mut io_uring) -> libc::c_int; + + pub fn io_uring_unregister_personality(ring: *mut io_uring, id: libc::c_int) -> libc::c_int; +} + +#[link(name = "rusturing")] +extern "C" { + #[link_name = "rust_io_uring_opcode_supported"] + pub fn io_uring_opcode_supported(p: *mut io_uring_probe, op: libc::c_int) -> libc::c_int; + + #[link_name = "rust_io_uring_cq_advance"] + pub fn io_uring_cq_advance(ring: *mut io_uring, nr: libc::c_uint); + + #[link_name = "rust_io_uring_cqe_seen"] + pub fn io_uring_cqe_seen(ring: *mut io_uring, cqe: *mut io_uring_cqe); + + #[link_name = "rust_io_uring_sqe_set_data"] + pub fn io_uring_sqe_set_data(sqe: *mut io_uring_sqe, data: *mut libc::c_void); + + #[link_name = "rust_io_uring_cqe_get_data"] + pub fn io_uring_cqe_get_data(cqe: *mut io_uring_cqe) -> *mut libc::c_void; + + #[link_name = "rust_io_uring_sqe_set_flags"] + pub fn io_uring_sqe_set_flags(sqe: *mut io_uring_sqe, flags: libc::c_uint); + + #[link_name = "rust_io_uring_prep_rw"] + pub fn io_uring_prep_rw( + op: libc::c_int, + sqe: *mut io_uring_sqe, + fd: libc::c_int, + addr: *const libc::c_void, + len: libc::c_uint, + offset: libc::__u64, + ); + + #[link_name = "rust_io_uring_prep_splice"] + pub fn io_uring_prep_splice( + sqe: *mut io_uring_sqe, + fd_in: libc::c_int, + off_in: libc::loff_t, + fd_out: libc::c_int, + off_out: libc::loff_t, + nbytes: libc::c_uint, + splice_flags: libc::c_uint, + ); + + #[link_name = "rust_io_uring_prep_readv"] + pub fn io_uring_prep_readv( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + iovecs: *const libc::iovec, + nr_vecs: libc::c_uint, + offset: libc::off_t, + ); + + #[link_name = "rust_io_uring_prep_read_fixed"] + pub fn io_uring_prep_read_fixed( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + buf: *mut libc::c_void, + nbytes: libc::c_uint, + offset: libc::off_t, + buf_index: libc::c_int, + ); + + #[link_name = "rust_io_uring_prep_writev"] + pub fn io_uring_prep_writev( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + iovecs: *const libc::iovec, + nr_vecs: libc::c_uint, + offset: libc::off_t, + ); + + #[link_name = "rust_io_uring_prep_write_fixed"] + pub fn io_uring_prep_write_fixed( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + buf: *const libc::c_void, + nbytes: libc::c_uint, + offset: libc::off_t, + buf_index: libc::c_int, + ); + + #[link_name = "rust_io_uring_prep_recvmsg"] + pub fn io_uring_prep_recvmsg( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + msg: *mut libc::msghdr, + flags: libc::c_uint, + ); + + #[link_name = "rust_io_uring_prep_sendmsg"] + pub fn io_uring_prep_sendmsg( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + msg: *const libc::msghdr, + flags: libc::c_uint, + ); + + #[link_name = "rust_io_uring_prep_poll_add"] + pub fn io_uring_prep_poll_add( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + poll_mask: libc::c_short, + ); + + #[link_name = "rust_io_uring_prep_poll_remove"] + pub fn io_uring_prep_poll_remove(sqe: *mut io_uring_sqe, user_data: *mut libc::c_void); + + #[link_name = "rust_io_uring_prep_fsync"] + pub fn io_uring_prep_fsync(sqe: *mut io_uring_sqe, fd: libc::c_int, fsync_flags: libc::c_uint); + + #[link_name = "rust_io_uring_prep_nop"] + pub fn io_uring_prep_nop(sqe: *mut io_uring_sqe); + + #[link_name = "rust_io_uring_prep_timeout"] + pub fn io_uring_prep_timeout( + sqe: *mut io_uring_sqe, + ts: *mut __kernel_timespec, + count: libc::c_uint, + flags: libc::c_uint, + ); + + #[link_name = "rust_io_uring_prep_timeout_remove"] + pub fn io_uring_prep_timeout_remove( + sqe: *mut io_uring_sqe, + user_data: libc::__u64, + flags: libc::c_uint, + ); + + #[link_name = "rust_io_uring_prep_accept"] + pub fn io_uring_prep_accept( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + addr: *mut libc::sockaddr, + addrlen: *mut libc::socklen_t, + flags: libc::c_int, + ); + + #[link_name = "rust_io_uring_prep_cancel"] + pub fn io_uring_prep_cancel( + sqe: *mut io_uring_sqe, + user_data: *mut libc::c_void, + flags: libc::c_int, + ); + + #[link_name = "rust_io_uring_prep_link_timeout"] + pub fn io_uring_prep_link_timeout( + sqe: *mut io_uring_sqe, + ts: *mut __kernel_timespec, + flags: libc::c_uint, + ); + + #[link_name = "rust_io_uring_prep_connect"] + pub fn io_uring_prep_connect( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + addr: *mut libc::sockaddr, + addrlen: libc::socklen_t, + ); + + #[link_name = "rust_io_uring_prep_files_update"] + pub fn io_uring_prep_files_update( + sqe: *mut io_uring_sqe, + fds: *mut libc::c_int, + nr_fds: libc::c_uint, + offset: libc::c_int, + ); + + #[link_name = "rust_io_uring_prep_fallocate"] + pub fn io_uring_prep_fallocate( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + mode: libc::c_int, + offset: libc::off_t, + len: libc::off_t, + ); + + #[link_name = "rust_io_uring_prep_openat"] + pub fn io_uring_prep_openat( + sqe: *mut io_uring_sqe, + dfd: libc::c_int, + path: *const libc::c_char, + flags: libc::c_int, + mode: libc::mode_t, + ); + + #[link_name = "rust_io_uring_prep_close"] + pub fn io_uring_prep_close(sqe: *mut io_uring_sqe, fd: libc::c_int); + + #[link_name = "rust_io_uring_prep_read"] + pub fn io_uring_prep_read( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + buf: *mut libc::c_void, + nbytes: libc::c_uint, + offset: libc::off_t, + ); + + #[link_name = "rust_io_uring_prep_write"] + pub fn io_uring_prep_write( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + buf: *const libc::c_void, + nbytes: libc::c_uint, + offset: libc::off_t, + ); + + #[link_name = "rust_io_uring_prep_statx"] + pub fn io_uring_prep_statx( + sqe: *mut io_uring_sqe, + dfd: libc::c_int, + path: *const libc::c_char, + flags: libc::c_int, + mask: libc::c_uint, + statx: *mut libc::statx, + ); + + #[link_name = "rust_io_uring_prep_fadvise: libc::c_int"] + pub fn io_uring_prep_fadvise( + sqe: *mut io_uring_sqe, + fd: libc::c_int, + offset: libc::off_t, + len: libc::off_t, + advice: libc::c_int, + ); + + #[link_name = "rust_io_uring_prep_madvise"] + pub fn io_uring_prep_madvise( + sqe: *mut io_uring_sqe, + addr: *mut libc::c_void, + length: libc::off_t, + advice: libc::c_int, + ); + + #[link_name = "rust_io_uring_prep_send"] + pub fn io_uring_prep_send( + sqe: *mut io_uring_sqe, + sockfd: libc::c_int, + buf: *const libc::c_void, + len: libc::size_t, + flags: libc::c_int, + ); + + #[link_name = "rust_io_uring_prep_recv"] + pub fn io_uring_prep_recv( + sqe: *mut io_uring_sqe, + sockfd: libc::c_int, + buf: *mut libc::c_void, + len: libc::size_t, + flags: libc::c_int, + ); + + #[link_name = "rust_io_uring_prep_openat2"] + pub fn io_uring_prep_openat2( + sqe: *mut io_uring_sqe, + dfd: libc::c_int, + path: *const libc::c_char, + how: *mut libc::c_void, + ); + + #[link_name = "rust_io_uring_prep_epoll_ctl"] + pub fn io_uring_prep_epoll_ctl( + sqe: *mut io_uring_sqe, + epfd: libc::c_int, + fd: libc::c_int, + op: libc::c_int, + ev: *mut libc::epoll_event, + ); + + #[link_name = "rust_io_uring_prep_provide_buffers"] + pub fn io_uring_prep_provide_buffers( + sqe: *mut io_uring_sqe, + addr: *mut libc::c_void, + len: libc::c_int, + nr: libc::c_int, + bgid: libc::c_int, + bid: libc::c_int, + ); + + #[link_name = "rust_io_uring_prep_remove_buffers"] + pub fn io_uring_prep_remove_buffers(sqe: *mut io_uring_sqe, nr: libc::c_int, bgid: libc::c_int); + + #[link_name = "rust_io_uring_sq_ready"] + pub fn io_uring_sq_ready(ring: *mut io_uring) -> libc::c_uint; + + #[link_name = "rust_io_uring_sq_space_left"] + pub fn io_uring_sq_space_left(ring: *mut io_uring) -> libc::c_uint; + + #[link_name = "rust_io_uring_cq_ready"] + pub fn io_uring_cq_ready(ring: *mut io_uring) -> libc::c_uint; + + #[link_name = "rust_io_uring_wait_cqe_nr"] + pub fn io_uring_wait_cqe_nr( + ring: *mut io_uring, + cqe_ptr: *mut *mut io_uring_cqe, + wait_nr: libc::c_uint, + ) -> libc::c_int; + + #[link_name = "rust_io_uring_cq_eventfd_enabled"] + pub fn io_uring_cq_eventfd_enabled(ring: *mut io_uring) -> bool; + + #[link_name = "rust_io_uring_cq_eventfd_toggle"] + pub fn io_uring_cq_eventfd_toggle(ring: *mut io_uring, enabled: bool) -> libc::c_int; + + #[link_name = "rust_io_uring_peek_cqe"] + pub fn io_uring_peek_cqe(ring: *mut io_uring, cqe_ptr: *mut *mut io_uring_cqe) -> libc::c_int; + + #[link_name = "rust_io_uring_wait_cqe"] + pub fn io_uring_wait_cqe(ring: *mut io_uring, cqe_ptr: *mut *mut io_uring_cqe) -> libc::c_int; +} diff --git a/glommio/src/uring_sys/syscalls.rs b/glommio/src/uring_sys/syscalls.rs new file mode 100644 index 000000000..292e3c7b3 --- /dev/null +++ b/glommio/src/uring_sys/syscalls.rs @@ -0,0 +1,40 @@ +use super::io_uring_params; + +// syscall constants +#[allow(non_upper_case_globals)] +const __NR_io_uring_setup: libc::c_long = 425; +#[allow(non_upper_case_globals)] +const __NR_io_uring_enter: libc::c_long = 426; +#[allow(non_upper_case_globals)] +const __NR_io_uring_register: libc::c_long = 427; + +pub unsafe fn io_uring_register( + fd: libc::c_int, + opcode: libc::c_uint, + arg: *const libc::c_void, + nr_args: libc::c_uint, +) -> libc::c_int { + libc::syscall(__NR_io_uring_register, fd, opcode, arg, nr_args) as libc::c_int +} + +pub unsafe fn io_uring_setup(entries: libc::c_uint, p: *mut io_uring_params) -> libc::c_int { + libc::syscall(__NR_io_uring_setup, entries, p) as libc::c_int +} + +pub unsafe fn io_uring_enter( + fd: libc::c_int, + to_submit: libc::c_uint, + min_complete: libc::c_uint, + flags: libc::c_uint, + sig: *const libc::sigset_t, +) -> libc::c_int { + libc::syscall( + __NR_io_uring_enter, + fd, + to_submit, + min_complete, + flags, + sig, + core::mem::size_of::(), + ) as libc::c_int +}