Skip to content

Commit

Permalink
Finalize buffer interface
Browse files Browse the repository at this point in the history
  • Loading branch information
DoumanAsh committed Jun 1, 2024
1 parent cb63beb commit 9af9432
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 15 deletions.
127 changes: 112 additions & 15 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,102 @@ use crate::options::{Options, Property};
use core::pin::Pin;
use core::ffi::c_int;
use core::future::Future;
use core::{mem, fmt, ops, ptr, task};
use core::{mem, fmt, ops, ptr, task, marker, slice};

use alloc::vec::Vec;

type InitFn = unsafe extern "C" fn(msg: *mut sys::nng_socket) -> core::ffi::c_int;

///Wrapper over slice of bytes.
///
///Can be converted into from any byte slice
pub struct Buf<'a> {
ptr: *const u8,
size: usize,
_lifetime: marker::PhantomData<&'a u8>,
}

impl<'a> Buf<'a> {
#[inline]
const fn new(ptr: *const u8, size: usize) -> Self {
Self {
ptr,
size,
_lifetime: marker::PhantomData,
}
}
}

impl<'a> From<&'a [u8]> for Buf<'a> {
#[inline(always)]
fn from(value: &'a [u8]) -> Self {
Self::new(value.as_ptr(), value.len())
}
}

impl<'a, const N: usize> From<&'a [u8; N]> for Buf<'a> {
#[inline(always)]
fn from(value: &'a [u8; N]) -> Self {
Self::new(value.as_ptr(), value.len())
}
}

impl<'a> From<&'a [mem::MaybeUninit<u8>]> for Buf<'a> {
#[inline(always)]
fn from(value: &'a [mem::MaybeUninit<u8>]) -> Self {
Self::new(value.as_ptr() as _, value.len())
}
}

///Wrapper over mutable slice of bytes.
///
///Can be converted into from any mutable byte slice or mutable Vec
pub struct BufMut<'a> {
ptr: *mut u8,
size: usize,
_lifetime: marker::PhantomData<&'a u8>,
}

impl<'a> BufMut<'a> {
#[inline]
const fn new(ptr: *mut u8, size: usize) -> Self {
Self {
ptr,
size,
_lifetime: marker::PhantomData,
}
}
}

impl<'a> From<&'a mut Vec<u8>> for BufMut<'a> {
#[inline(always)]
fn from(value: &'a mut Vec<u8>) -> Self {
let value = value.spare_capacity_mut();
From::from(value)
}
}

impl<'a> From<&'a mut [u8]> for BufMut<'a> {
#[inline(always)]
fn from(value: &'a mut [u8]) -> Self {
Self::new(value.as_mut_ptr(), value.len())
}
}

impl<'a, const N: usize> From<&'a mut [u8; N]> for BufMut<'a> {
#[inline(always)]
fn from(value: &'a mut [u8; N]) -> Self {
Self::new(value.as_mut_ptr(), value.len())
}
}

impl<'a> From<&'a mut [mem::MaybeUninit<u8>]> for BufMut<'a> {
#[inline(always)]
fn from(value: &'a mut [mem::MaybeUninit<u8>]) -> Self {
Self::new(value.as_mut_ptr() as _, value.len())
}
}

#[derive(Clone, Default)]
///Connect options
pub struct ConnectOptions<T> {
Expand Down Expand Up @@ -176,40 +268,45 @@ impl Socket {
T::get(self)
}

fn recv_inner<const FLAGS: c_int>(&self, out: &mut [mem::MaybeUninit<u8>]) -> Result<usize, ErrorCode> {
let mut size = out.len();
fn recv_inner<'a, const FLAGS: c_int>(&self, out: BufMut<'a>) -> Result<&'a [u8], ErrorCode> {
let mut size = out.size;
let result = unsafe {
sys::nng_recv(**self, out.as_mut_ptr() as _, &mut size, FLAGS)
sys::nng_recv(**self, out.ptr as _, &mut size, FLAGS)
};

match result {
0 => Ok(size),
0 => {
let out = unsafe {
slice::from_raw_parts(out.ptr, size)
};
Ok(out)
},
code => Err(error(code)),
}
}

#[inline]
#[inline(always)]
///Attempts to receive message, writing it in `out` buffer if it is of sufficient size,
///returning immediately if no message is available
///
///If underlying protocol doesn't support receiving messages, this shall return error always
///
///Returns number of bytes written on success
///Returns written bytes on success
///
///Returns [would block](https://docs.rs/error-code/3.2.0/error_code/struct.ErrorCode.html#method.is_would_block)
///error if no message is available.
pub fn try_recv(&self, out: &mut [mem::MaybeUninit<u8>]) -> Result<usize, ErrorCode> {
self.recv_inner::<{sys::NNG_FLAG_NONBLOCK}>(out)
pub fn try_recv<'a>(&self, out: impl Into<BufMut<'a>>) -> Result<&'a [u8], ErrorCode> {
self.recv_inner::<{sys::NNG_FLAG_NONBLOCK}>(out.into())
}

#[inline]
#[inline(always)]
///Receives message, writing it in `out` buffer if it is of sufficient size, waiting forever if none is available.
///
///If underlying protocol doesn't support receiving messages, this shall return error always
///
///Returns number of bytes written on success
pub fn recv(&self, out: &mut [mem::MaybeUninit<u8>]) -> Result<usize, ErrorCode> {
self.recv_inner::<0>(out)
///Returns written bytes on success
pub fn recv<'a>(&self, out: impl Into<BufMut<'a>>) -> Result<&'a [u8], ErrorCode> {
self.recv_inner::<0>(out.into())
}

///Receives pending message, waiting forever if none is available.
Expand Down Expand Up @@ -259,9 +356,9 @@ impl Socket {
///Encodes bytes into message and send it over the socket.
///
///Internally message shall be encoded and sent over
pub fn send(&self, msg: &[u8]) -> Result<(), ErrorCode> {
pub fn send(&self, msg: Buf<'_>) -> Result<(), ErrorCode> {
let result = unsafe {
sys::nng_send(**self, msg.as_ptr() as _, msg.len(), 0)
sys::nng_send(**self, msg.ptr as _, msg.size, 0)
};

match result {
Expand Down
24 changes: 24 additions & 0 deletions tests/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ fn should_do_req_resp_inproc() {
assert_eq!(second, SECOND);
assert_eq!(third, THIRD);
assert_eq!(resp.body(), BYTES);

let mut buffer = [0; BYTES.len()];
server.try_recv(&mut buffer).expect_err("Should not have any more message");
client.send(BYTES.into()).expect("send bytes");
let result = server.recv(&mut buffer).expect("to receive data");
assert_eq!(result, BYTES);
}

#[test]
Expand Down Expand Up @@ -136,6 +142,12 @@ fn should_do_req_resp_async_inproc() {
assert_eq!(second, SECOND);
assert_eq!(third, THIRD);
assert_eq!(resp.body(), BYTES);

let mut buffer = [0; BYTES.len()];
server.try_recv(&mut buffer).expect_err("Should not have any more message");
client.send(BYTES.into()).expect("send bytes");
let result = server.recv(&mut buffer).expect("to receive data");
assert_eq!(result, BYTES);
}

#[test]
Expand Down Expand Up @@ -202,6 +214,12 @@ fn should_do_req_resp_async_tcp() {
assert_eq!(second, SECOND);
assert_eq!(third, THIRD);
assert_eq!(resp.body(), BYTES);

let mut buffer = [0; BYTES.len()];
server.try_recv(&mut buffer).expect_err("Should not have any more message");
client.send(BYTES.into()).expect("send bytes");
let result = server.recv(&mut buffer).expect("to receive data");
assert_eq!(result, BYTES);
}

#[test]
Expand Down Expand Up @@ -263,4 +281,10 @@ fn should_do_req_resp_async_ipc() {
assert_eq!(second, SECOND);
assert_eq!(third, THIRD);
assert_eq!(resp.body(), BYTES);

let mut buffer = [0; BYTES.len()];
server.try_recv(&mut buffer).expect_err("Should not have any more message");
client.send(BYTES.into()).expect("send bytes");
let result = server.recv(&mut buffer).expect("to receive data");
assert_eq!(result, BYTES);
}

0 comments on commit 9af9432

Please sign in to comment.