Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Pipes in nanonext #65

Merged
merged 4 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.3.2.9005
Version: 1.3.2.9006
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
3 changes: 0 additions & 3 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ S3method("[[<-",sendAio)
S3method(close,nanoContext)
S3method(close,nanoDialer)
S3method(close,nanoListener)
S3method(close,nanoPipe)
S3method(close,nanoSocket)
S3method(close,nanoStream)
S3method(close,ncurlSession)
Expand All @@ -24,7 +23,6 @@ S3method(print,nanoContext)
S3method(print,nanoDialer)
S3method(print,nanoListener)
S3method(print,nanoObject)
S3method(print,nanoPipe)
S3method(print,nanoSocket)
S3method(print,nanoStream)
S3method(print,ncurlAio)
Expand Down Expand Up @@ -92,7 +90,6 @@ export(send)
export(send_aio)
export(serial_config)
export(socket)
export(socket_pipe)
export(stat)
export(status_code)
export(stop_aio)
Expand Down
11 changes: 6 additions & 5 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# nanonext 1.3.2.9005 (development)
# nanonext 1.3.2.9006 (development)

#### New Features

* `opt()` gains the ability to retrieve options from a Pipe.
* A 'recvAio' now records the integer pipe ID, where successful, at `$aio` upon resolution.
* `socket_pipe()` creates a 'nanoPipe' object from an integer pipe ID.
* New interface to Pipes moves to using integer pipe IDs rather than Pipe (external pointer) objects:
+ `send_aio()` gains the argument 'pipe' which accepts an integer pipe ID for directed sends (currently only supported by Sockets using the 'poly' protocol).
+ A 'recvAio' now records the integer pipe ID, where successful, at `$aio` upon resolution.
+ Pipe objects (of class 'nanoPipe') are obsoleted.

#### Updates

* `collect_aio()` is superseded by `socket_pipe()` and is removed.
* `collect_aio()` is removed given the pipe interface changes.

# nanonext 1.3.2

Expand Down
8 changes: 5 additions & 3 deletions R/aio.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
#' Alternatively, to stop the async operation, use \code{\link{stop_aio}}.
#'
#' @inheritParams send
#' @param con a Socket, Context, Stream or Pipe.
#' @param con a Socket, Context or Stream.
#' @param timeout [default NULL] integer value in milliseconds or NULL, which
#' applies a socket-specific default, usually the same as no timeout.
#' @param pipe [default 0L] only applicable to Sockets using the 'poly' protocol,
#' an integer pipe ID if directing the send via a specific pipe.
#'
#' @return A \sQuote{sendAio} (object of class \sQuote{sendAio}) (invisibly).
#'
Expand All @@ -57,8 +59,8 @@
#'
#' @export
#'
send_aio <- function(con, data, mode = c("serial", "raw"), timeout = NULL)
data <- .Call(rnng_send_aio, con, data, mode, timeout, environment())
send_aio <- function(con, data, mode = c("serial", "raw"), timeout = NULL, pipe = 0L)
data <- .Call(rnng_send_aio, con, data, mode, timeout, pipe, environment())

#' Receive Async
#'
Expand Down
9 changes: 0 additions & 9 deletions R/nano.R
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,6 @@ print.nanoStream <- function(x, ...) {

}

#' @export
#'
print.nanoPipe <- function(x, ...) {

cat(sprintf("< nanoPipe >\n - id: %s\n", attr(x, "id")), file = stdout())
invisible(x)

}

#' @export
#'
print.recvAio <- function(x, ...) {
Expand Down
47 changes: 1 addition & 46 deletions R/socket.R
Original file line number Diff line number Diff line change
Expand Up @@ -115,41 +115,6 @@ socket <- function(protocol = c("bus", "pair", "poly", "push", "pull", "pub",
raw = FALSE)
.Call(rnng_protocol_open, protocol, dial, listen, tls, autostart, raw)

#' Create a Pipe from a Pipe ID
#'
#' This function creates a Pipe from the pipe identifier recorded at $aio in a
#' resolved Aio, for use with compatible functions such as \code{\link{send_aio}}.
#' A Pipe is a low-level object and it is not normally necessary to deal with
#' them directly.
#'
#' As Pipes are always owned by a Socket, removing (and garbage collecting) a
#' Pipe does not close it or free its resources. A Pipe may, however, be
#' explicitly closed.
#'
#' @param x integer pipe ID.
#'
#' @return A Pipe (object of class \sQuote{nanoPipe}).
#'
#' @examples
#' s <- socket("rep", listen = "inproc://nanonext")
#' s1 <- socket("req", dial = "inproc://nanonext")
#'
#' r <- recv_aio(s, timeout = 500)
#'
#' if (!send(s1, "")) {
#' call_aio(r)
#' p <- socket_pipe(r$aio)
#' print(p)
#' reap(p)
#' }
#'
#' close(s)
#' close(s1)
#'
#' @export
#'
socket_pipe <- function(x) .Call(rnng_socket_pipe, x)

#' Close Connection
#'
#' Close Connection on a Socket, Context, Dialer, Listener, Stream, Pipe, or
Expand All @@ -175,11 +140,7 @@ socket_pipe <- function(x) .Call(rnng_socket_pipe, x)
#'
#' Closing an \sQuote{ncurlSession} closes the http(s) connection.
#'
#' As Pipes are owned by the corresponding Socket, removing (and garbage
#' collecting) a Pipe does not close it or free its resources. A Pipe may,
#' however, be explicitly closed.
#'
#' @param con a Socket, Context, Dialer, Listener, Stream, Pipe, or
#' @param con a Socket, Context, Dialer, Listener, Stream, or
#' \sQuote{ncurlSession}.
#' @param ... not used.
#'
Expand All @@ -198,12 +159,6 @@ NULL
#'
close.nanoSocket <- function(con, ...) invisible(.Call(rnng_close, con))

#' @rdname close
#' @method close nanoPipe
#' @export
#'
close.nanoPipe <- function(con, ...) invisible(.Call(rnng_pipe_close, con))

#' Reap
#'
#' An alternative to \code{close} for Sockets, Contexts, Listeners, Dialers and
Expand Down
3 changes: 1 addition & 2 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ parse_url <- function(url) .Call(rnng_url_parse, url)
#' \sQuote{recvAio}).
#'
#' Is the object an object inheriting from class \sQuote{nano} i.e. a
#' nanoSocket, nanoContext, nanoStream, nanoListener, nanoDialer, nanoPipe or
#' nano Object.
#' nanoSocket, nanoContext, nanoStream, nanoListener, nanoDialer or nano Object.
#'
#' Is the object an ncurlSession (object of class \sQuote{ncurlSession}).
#'
Expand Down
9 changes: 1 addition & 8 deletions man/close.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions man/is_aio.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions man/send_aio.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 0 additions & 42 deletions man/socket_pipe.Rd

This file was deleted.

43 changes: 11 additions & 32 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ static void raio_complete(void *arg) {
if (res == 0) {
nng_msg *msg = nng_aio_get_msg(raio->aio);
raio->data = msg;
res = -nng_msg_get_pipe(msg).id;
nng_pipe p = nng_msg_get_pipe(msg);
res = - (int) p.id;
}

raio->result = res;
Expand Down Expand Up @@ -419,9 +420,10 @@ SEXP rnng_unresolved2(SEXP x) {

// send recv aio functions -----------------------------------------------------

SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) {
SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP clo) {

const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) nano_integer(timeout);
const int pipeid = nano_integer(pipe);
nano_aio *saio;
SEXP aio, env, fun;
nano_buf buf;
Expand All @@ -444,6 +446,11 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) {
goto exitlevel1;
}

if (pipeid) {
nng_pipe p;
p.id = (uint32_t) pipeid;
nng_msg_set_pipe(msg, p);
}
nng_aio_set_msg(saio->aio, msg);
nng_aio_set_timeout(saio->aio, dur);
sock ? nng_send_aio(*(nng_socket *) NANO_PTR(con), saio->aio) :
Expand Down Expand Up @@ -481,36 +488,8 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) {
PROTECT(aio = R_MakeExternalPtr(saio, nano_AioSymbol, R_NilValue));
R_RegisterCFinalizerEx(aio, iaio_finalizer, TRUE);

} else if (ptrtag == nano_PipeSymbol) {

nng_pipe *p = (nng_pipe *) NANO_PTR(con);
nng_socket sock = nng_pipe_socket(*p);

nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con));
nng_msg *msg;
saio = R_Calloc(1, nano_aio);
saio->type = SENDAIO;

if ((xc = nng_msg_alloc(&msg, 0)))
goto exitlevel1;

if ((xc = nng_msg_append(msg, buf.buf, buf.cur)) ||
(xc = nng_aio_alloc(&saio->aio, saio_complete, saio))) {
nng_msg_free(msg);
goto exitlevel1;
}

nng_msg_set_pipe(msg, *p);
nng_aio_set_msg(saio->aio, msg);
nng_aio_set_timeout(saio->aio, dur);
nng_send_aio(sock, saio->aio);
NANO_FREE(buf);

PROTECT(aio = R_MakeExternalPtr(saio, nano_AioSymbol, R_NilValue));
R_RegisterCFinalizerEx(aio, saio_finalizer, TRUE);

} else {
NANO_ERROR("'con' is not a valid Socket, Context, Stream or Pipe");
} else {
NANO_ERROR("'con' is not a valid Socket, Context, or Stream");
}

PROTECT(env = R_NewEnv(R_NilValue, 0, 0));
Expand Down
3 changes: 2 additions & 1 deletion src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ void raio_complete_signal(void *arg) {
if (res == 0) {
nng_msg *msg = nng_aio_get_msg(raio->aio);
raio->data = msg;
res = -nng_msg_get_pipe(msg).id;
nng_pipe p = nng_msg_get_pipe(msg);
res = - (int) p.id;
}

nng_mtx_lock(mtx);
Expand Down
4 changes: 1 addition & 3 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_ncurl_session", (DL_FUNC) &rnng_ncurl_session, 8},
{"rnng_ncurl_session_close", (DL_FUNC) &rnng_ncurl_session_close, 1},
{"rnng_ncurl_transact", (DL_FUNC) &rnng_ncurl_transact, 1},
{"rnng_pipe_close", (DL_FUNC) &rnng_pipe_close, 1},
{"rnng_pipe_notify", (DL_FUNC) &rnng_pipe_notify, 6},
{"rnng_protocol_open", (DL_FUNC) &rnng_protocol_open, 6},
{"rnng_random", (DL_FUNC) &rnng_random, 2},
Expand All @@ -175,15 +174,14 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 6},
{"rnng_request", (DL_FUNC) &rnng_request, 7},
{"rnng_send", (DL_FUNC) &rnng_send, 4},
{"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 5},
{"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 6},
{"rnng_serial_config", (DL_FUNC) &rnng_serial_config, 4},
{"rnng_set_marker", (DL_FUNC) &rnng_set_marker, 0},
{"rnng_set_opt", (DL_FUNC) &rnng_set_opt, 3},
{"rnng_set_promise_context", (DL_FUNC) &rnng_set_promise_context, 2},
{"rnng_signal_thread_create", (DL_FUNC) &rnng_signal_thread_create, 2},
{"rnng_sleep", (DL_FUNC) &rnng_sleep, 1},
{"rnng_socket_lock", (DL_FUNC) &rnng_socket_lock, 2},
{"rnng_socket_pipe", (DL_FUNC) &rnng_socket_pipe, 1},
{"rnng_socket_unlock", (DL_FUNC) &rnng_socket_unlock, 1},
{"rnng_stats_get", (DL_FUNC) &rnng_stats_get, 2},
{"rnng_status_code", (DL_FUNC) &rnng_status_code, 1},
Expand Down
Loading
Loading