Skip to content

Commit

Permalink
new uring code.
Browse files Browse the repository at this point in the history
This prepares us for the release of a new version of liburing.
iou and uring-sys changed a bit, in particular iou. Most of the
changes stem from the fact that we now can (should) use the nix
versions of things like MsgFlags. The registrar and sqe apis
are also marginally different and are updated

Note that this patch, on itself, will not compile as it needs
matching iou and uring-sys. The next patch will fix that as it brings
those dependencies inside glommio.
  • Loading branch information
Glauber Costa committed Mar 2, 2021
1 parent 454a46c commit 7a1607f
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 79 deletions.
4 changes: 2 additions & 2 deletions glommio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ concurrent-queue = "1.1.2"
futures-lite = "1.11.1"
libc = "0.2.73"
socket2 = { version = "0.3.18", features = ["unix", "reuseport"] }
iou = { git = "https://github.com/glommer/iou", tag = "glommio-2020-11-30" }
uring-sys = { git = "https://github.com/glommer/uring-sys", tag = "scipio-2020-09-10" }
iou = { path = "../../iou" }
uring-sys = { path = "../../uring-sys" }
nix = "0.19.0"
bitmaps = "2.1.0"
typenum = "1.12"
Expand Down
15 changes: 8 additions & 7 deletions glommio/src/net/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//
use crate::sys::{self, DmaBuffer, Source, SourceType};
use crate::{ByteSliceMutExt, Local, Reactor};
use nix::sys::socket::MsgFlags;
use std::cell::Cell;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
Expand Down Expand Up @@ -74,7 +75,7 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
let source = self.reactor.upgrade().unwrap().recv(
self.socket.as_raw_fd(),
buf.len(),
iou::MsgFlags::MSG_PEEK,
MsgFlags::MSG_PEEK,
);

self.consume_receive_buffer(&source, buf).await
Expand All @@ -84,9 +85,9 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
&self,
buf: &mut [u8],
) -> io::Result<(usize, nix::sys::socket::SockAddr)> {
match self.yolo_recvmsg(buf, iou::MsgFlags::MSG_PEEK) {
match self.yolo_recvmsg(buf, MsgFlags::MSG_PEEK) {
Some(res) => res,
None => self.recv_from_blocking(buf, iou::MsgFlags::MSG_PEEK).await,
None => self.recv_from_blocking(buf, MsgFlags::MSG_PEEK).await,
}
}

Expand All @@ -107,7 +108,7 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
pub(crate) async fn recv_from_blocking(
&self,
buf: &mut [u8],
flags: iou::MsgFlags,
flags: MsgFlags,
) -> io::Result<(usize, nix::sys::socket::SockAddr)> {
let source = self.reactor.upgrade().unwrap().rushed_recvmsg(
self.socket.as_raw_fd(),
Expand All @@ -132,9 +133,9 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
&self,
buf: &mut [u8],
) -> io::Result<(usize, nix::sys::socket::SockAddr)> {
match self.yolo_recvmsg(buf, iou::MsgFlags::empty()) {
match self.yolo_recvmsg(buf, MsgFlags::empty()) {
Some(res) => res,
None => self.recv_from_blocking(buf, iou::MsgFlags::empty()).await,
None => self.recv_from_blocking(buf, MsgFlags::empty()).await,
}
}

Expand Down Expand Up @@ -203,7 +204,7 @@ impl<S: AsRawFd + FromRawFd + From<socket2::Socket>> GlommioDatagram<S> {
fn yolo_recvmsg(
&self,
buf: &mut [u8],
flags: iou::MsgFlags,
flags: MsgFlags,
) -> Option<io::Result<(usize, nix::sys::socket::SockAddr)>> {
if self.rx_yolo.get() {
super::yolo_recvmsg(self.socket.as_raw_fd(), buf, flags)
Expand Down
16 changes: 6 additions & 10 deletions glommio/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//
//! This module provide glommio's networking support.
use crate::sys;
use nix::sys::socket::MsgFlags;
use std::io;
use std::os::unix::io::RawFd;

Expand All @@ -28,12 +29,7 @@ fn yolo_accept(fd: RawFd) -> Option<io::Result<RawFd>> {
}

fn yolo_send(fd: RawFd, buf: &[u8]) -> Option<io::Result<usize>> {
match sys::send_syscall(
fd,
buf.as_ptr(),
buf.len(),
iou::MsgFlags::MSG_DONTWAIT.bits(),
) {
match sys::send_syscall(fd, buf.as_ptr(), buf.len(), MsgFlags::MSG_DONTWAIT.bits()) {
Ok(x) => Some(Ok(x)),
Err(err) => match err.kind() {
io::ErrorKind::WouldBlock => None,
Expand All @@ -47,7 +43,7 @@ fn yolo_recv(fd: RawFd, buf: &mut [u8]) -> Option<io::Result<usize>> {
fd,
buf.as_mut_ptr(),
buf.len(),
iou::MsgFlags::MSG_DONTWAIT.bits(),
MsgFlags::MSG_DONTWAIT.bits(),
) {
Ok(x) => Some(Ok(x)),
Err(err) => match err.kind() {
Expand All @@ -60,13 +56,13 @@ fn yolo_recv(fd: RawFd, buf: &mut [u8]) -> Option<io::Result<usize>> {
fn yolo_recvmsg(
fd: RawFd,
buf: &mut [u8],
flags: iou::MsgFlags,
flags: MsgFlags,
) -> Option<io::Result<(usize, nix::sys::socket::SockAddr)>> {
match sys::recvmsg_syscall(
fd,
buf.as_mut_ptr(),
buf.len(),
(flags | iou::MsgFlags::MSG_DONTWAIT).bits(),
(flags | MsgFlags::MSG_DONTWAIT).bits(),
) {
Ok(x) => Some(Ok(x)),
Err(err) => match err.kind() {
Expand All @@ -86,7 +82,7 @@ fn yolo_sendmsg(
buf.as_ptr(),
buf.len(),
addr,
iou::MsgFlags::MSG_DONTWAIT.bits(),
MsgFlags::MSG_DONTWAIT.bits(),
) {
Ok(x) => Some(Ok(x)),
Err(err) => match err.kind() {
Expand Down
3 changes: 2 additions & 1 deletion glommio/src/net/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::parking::Reactor;
use crate::sys::{self, DmaBuffer, Source, SourceType};
use crate::{ByteSliceMutExt, Local};
use futures_lite::ready;
use nix::sys::socket::MsgFlags;
use std::convert::TryFrom;
use std::io;
use std::net::Shutdown;
Expand Down Expand Up @@ -97,7 +98,7 @@ impl<S: FromRawFd + AsRawFd + From<socket2::Socket>> GlommioStream<S> {
let source = self.reactor.upgrade().unwrap().recv(
self.stream.as_raw_fd(),
buf.len(),
iou::MsgFlags::MSG_PEEK,
MsgFlags::MSG_PEEK,
);

let sz = source.collect_rw().await?;
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/net/tcp_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures_lite::future::poll_fn;
use futures_lite::io::{AsyncBufRead, AsyncRead, AsyncWrite};
use futures_lite::ready;
use futures_lite::stream::{self, Stream};
use iou::{InetAddr, SockAddr};
use nix::sys::socket::{InetAddr, SockAddr};
use pin_project_lite::pin_project;
use socket2::{Domain, Protocol, Socket, Type};
use std::io;
Expand Down
9 changes: 5 additions & 4 deletions glommio/src/net/udp_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc.
//
use super::datagram::GlommioDatagram;
use iou::{InetAddr, SockAddr};
use nix::sys::socket::{InetAddr, SockAddr};
use socket2::{Domain, Protocol, Socket, Type};
use std::io;
use std::net::{self, SocketAddr, ToSocketAddrs};
Expand Down Expand Up @@ -329,6 +329,7 @@ mod tests {
use crate::timer::Timer;
use crate::Local;
use crate::LocalExecutorBuilder;
use nix::sys::socket::MsgFlags;
use std::time::Duration;

macro_rules! connected_pair {
Expand Down Expand Up @@ -548,14 +549,14 @@ mod tests {
for _ in 0..10 {
let (sz, _) = receiver
.socket
.recv_from_blocking(&mut buf, iou::MsgFlags::MSG_PEEK)
.recv_from_blocking(&mut buf, MsgFlags::MSG_PEEK)
.await
.unwrap();
assert_eq!(sz, 1);
}
let (_, from) = receiver
.socket
.recv_from_blocking(&mut buf, iou::MsgFlags::MSG_PEEK)
.recv_from_blocking(&mut buf, MsgFlags::MSG_PEEK)
.await
.unwrap();
let addr = match from {
Expand Down Expand Up @@ -605,7 +606,7 @@ mod tests {
let mut buf = [0u8; 1];
let (sz, from) = receiver
.socket
.recv_from_blocking(&mut buf, iou::MsgFlags::empty())
.recv_from_blocking(&mut buf, MsgFlags::empty())
.await
.unwrap();
assert_eq!(sz, 1);
Expand Down
5 changes: 2 additions & 3 deletions glommio/src/net/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use futures_lite::future::poll_fn;
use futures_lite::io::{AsyncBufRead, AsyncRead, AsyncWrite};
use futures_lite::ready;
use futures_lite::stream::{self, Stream};
use iou::SockAddr;
use nix::sys::socket::UnixAddr;
use nix::sys::socket::{SockAddr, UnixAddr};
use pin_project_lite::pin_project;
use socket2::{Domain, Socket, Type};
use std::io;
Expand Down Expand Up @@ -520,7 +519,7 @@ impl UnixDatagram {
/// [`send`]: UnixDatagram::send
/// [`recv`]: UnixDatagram::recv
pub async fn connect<A: AsRef<Path>>(&self, addr: A) -> Result<()> {
let addr = iou::SockAddr::new_unix(addr.as_ref())
let addr = SockAddr::new_unix(addr.as_ref())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

let reactor = self.socket.reactor.upgrade().unwrap();
Expand Down
13 changes: 7 additions & 6 deletions glommio/src/parking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
//!

use ahash::AHashMap;
use iou::{SockAddr, SockAddrStorage};
use iou::sqe::SockAddrStorage;
use nix::sys::socket::{MsgFlags, SockAddr};
use std::cell::{Cell, RefCell};
use std::collections::{BTreeMap, VecDeque};
use std::ffi::CString;
Expand Down Expand Up @@ -363,7 +364,7 @@ impl Reactor {

pub(crate) fn rushed_send(&self, fd: RawFd, buf: DmaBuffer) -> io::Result<Source> {
let source = self.new_source(fd, SourceType::SockSend(buf));
self.sys.send(&source, iou::MsgFlags::empty());
self.sys.send(&source, MsgFlags::empty());
self.rush_dispatch(&source)?;
Ok(source)
}
Expand All @@ -390,7 +391,7 @@ impl Reactor {
};

let source = self.new_source(fd, SourceType::SockSendMsg(buf, iov, hdr, addr));
self.sys.sendmsg(&source, iou::MsgFlags::empty());
self.sys.sendmsg(&source, MsgFlags::empty());
self.rush_dispatch(&source)?;
Ok(source)
}
Expand All @@ -399,7 +400,7 @@ impl Reactor {
&self,
fd: RawFd,
size: usize,
flags: iou::MsgFlags,
flags: MsgFlags,
) -> io::Result<Source> {
let hdr = libc::msghdr {
msg_name: std::ptr::null_mut(),
Expand Down Expand Up @@ -430,12 +431,12 @@ impl Reactor {

pub(crate) fn rushed_recv(&self, fd: RawFd, size: usize) -> io::Result<Source> {
let source = self.new_source(fd, SourceType::SockRecv(None));
self.sys.recv(&source, size, iou::MsgFlags::empty());
self.sys.recv(&source, size, MsgFlags::empty());
self.rush_dispatch(&source)?;
Ok(source)
}

pub(crate) fn recv(&self, fd: RawFd, size: usize, flags: iou::MsgFlags) -> Source {
pub(crate) fn recv(&self, fd: RawFd, size: usize, flags: MsgFlags) -> Source {
let source = self.new_source(fd, SourceType::SockRecv(None));
self.sys.recv(&source, size, flags);
source
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/sys/dma_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl DmaBuffer {
}
}

pub(crate) fn uring_buffer_id(&self) -> Option<usize> {
pub(crate) fn uring_buffer_id(&self) -> Option<u32> {
match &self.storage {
BufferStorage::Uring(x) => x.uring_buffer_id(),
_ => None,
Expand Down
3 changes: 2 additions & 1 deletion glommio/src/sys/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020 Datadog, Inc.
//
use ahash::AHashMap;
use iou::{SockAddr, SockAddrStorage};
use iou::sqe::SockAddrStorage;
use nix::sys::socket::SockAddr;
use std::cell::{Cell, RefCell};
use std::convert::TryFrom;
use std::ffi::CString;
Expand Down
Loading

0 comments on commit 7a1607f

Please sign in to comment.