diff --git a/DESCRIPTION b/DESCRIPTION index b3397ddab..2abe37531 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: nanonext Type: Package Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library -Version: 1.3.2.9006 +Version: 1.3.2.9007 Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is a socket library implementing 'Scalability Protocols', a reliable, high-performance standard for common communications patterns including diff --git a/NAMESPACE b/NAMESPACE index da4a86e15..595ba039e 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -22,6 +22,7 @@ S3method(print,errorValue) S3method(print,nanoContext) S3method(print,nanoDialer) S3method(print,nanoListener) +S3method(print,nanoMonitor) S3method(print,nanoObject) S3method(print,nanoSocket) S3method(print,nanoStream) @@ -70,6 +71,7 @@ export(listen) export(lock) export(mclock) export(messenger) +export(monitor) export(msleep) export(nano) export(ncurl) @@ -81,6 +83,7 @@ export(opt) export(parse_url) export(pipe_notify) export(random) +export(read_monitor) export(reap) export(recv) export(recv_aio) diff --git a/NEWS.md b/NEWS.md index 8a33d3261..9b183612d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# nanonext 1.3.2.9006 (development) +# nanonext 1.3.2.9007 (development) #### New Features @@ -6,6 +6,7 @@ + `send_aio()` gains the argument 'pipe' which accepts an integer pipe ID for directed sends (currently only supported by Sockets using the 'poly' protocol). + A 'recvAio' now records the integer pipe ID, where successful, at `$aio` upon resolution. + Pipe objects (of class 'nanoPipe') are obsoleted. +* Adds `monitor()` and `read_monitor()` for easy monitoring of connection changes (pipe additons and removals) at a Socket. #### Updates diff --git a/R/nano.R b/R/nano.R index 5375bd97b..406acbf8f 100644 --- a/R/nano.R +++ b/R/nano.R @@ -326,6 +326,15 @@ print.nanoStream <- function(x, ...) { } +#' @export +#' +print.nanoMonitor <- function(x, ...) { + + cat(sprintf("< nanoMonitor >\n - socket: %s\n", attr(x, "socket")), file = stdout()) + invisible(x) + +} + #' @export #' print.recvAio <- function(x, ...) { diff --git a/R/socket.R b/R/socket.R index a3d5bcf80..c035bd58e 100644 --- a/R/socket.R +++ b/R/socket.R @@ -189,3 +189,45 @@ close.nanoSocket <- function(con, ...) invisible(.Call(rnng_close, con)) #' @export #' reap <- function(con) .Call(rnng_reap, con) + +#' Monitor a Socket for Pipe Changes +#' +#' This function monitors pipe additions and removals from a socket. +#' +#' @param sock a Socket. +#' @param cv a conditionVariable. +#' +#' @return For \code{monitor}: a Monitor (object of class 'nanoMonitor'). \cr +#' For \code{read_monitor}: an integer vector of pipe IDs (positive if added, +#' negative if removed), or else NULL if there were no changes since the +#' previous read. +#' +#' @examples +#' cv <- cv() +#' s <- socket("poly") +#' s1 <- socket("poly") +#' +#' m <- monitor(s, cv) +#' m +#' +#' listen(s) +#' dial(s1) +#' +#' cv_value(cv) +#' read_monitor(m) +#' +#' close(s) +#' close(s1) +#' +#' read_monitor(m) +#' +#' @export +#' +monitor <- function(sock, cv) .Call(rnng_monitor_create, sock, cv) + +#' @param x an external pointer to a monitor. +#' +#' @rdname monitor +#' @export +#' +read_monitor <- function(x) .Call(rnng_monitor_read, x) diff --git a/R/utils.R b/R/utils.R index 2336ed61b..6882e1081 100644 --- a/R/utils.R +++ b/R/utils.R @@ -167,7 +167,8 @@ parse_url <- function(url) .Call(rnng_url_parse, url) #' \sQuote{recvAio}). #' #' Is the object an object inheriting from class \sQuote{nano} i.e. a -#' nanoSocket, nanoContext, nanoStream, nanoListener, nanoDialer or nano Object. +#' nanoSocket, nanoContext, nanoStream, nanoListener, nanoDialer, nanoMonitor or +#' nano Object. #' #' Is the object an ncurlSession (object of class \sQuote{ncurlSession}). #' diff --git a/man/is_aio.Rd b/man/is_aio.Rd index 43e10d025..9ffde8f3d 100644 --- a/man/is_aio.Rd +++ b/man/is_aio.Rd @@ -26,7 +26,8 @@ Is the object an Aio (inheriting from class \sQuote{sendAio} or \sQuote{recvAio}). Is the object an object inheriting from class \sQuote{nano} i.e. a -nanoSocket, nanoContext, nanoStream, nanoListener, nanoDialer or nano Object. +nanoSocket, nanoContext, nanoStream, nanoListener, nanoDialer, nanoMonitor or +nano Object. Is the object an ncurlSession (object of class \sQuote{ncurlSession}). diff --git a/man/monitor.Rd b/man/monitor.Rd new file mode 100644 index 000000000..487d96b7c --- /dev/null +++ b/man/monitor.Rd @@ -0,0 +1,47 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/socket.R +\name{monitor} +\alias{monitor} +\alias{read_monitor} +\title{Monitor a Socket for Pipe Changes} +\usage{ +monitor(sock, cv) + +read_monitor(x) +} +\arguments{ +\item{sock}{a Socket.} + +\item{cv}{a conditionVariable.} + +\item{x}{an external pointer to a monitor.} +} +\value{ +For \code{monitor}: a Monitor (object of class 'nanoMonitor'). \cr + For \code{read_monitor}: an integer vector of pipe IDs (positive if added, + negative if removed), or else NULL if there were no changes since the + previous read. +} +\description{ +This function monitors pipe additions and removals from a socket. +} +\examples{ +cv <- cv() +s <- socket("poly") +s1 <- socket("poly") + +m <- monitor(s, cv) +m + +listen(s) +dial(s1) + +cv_value(cv) +read_monitor(m) + +close(s) +close(s1) + +read_monitor(m) + +} diff --git a/src/init.c b/src/init.c index cb844446f..ff49a6be2 100644 --- a/src/init.c +++ b/src/init.c @@ -38,7 +38,7 @@ SEXP nano_DotcallSymbol; SEXP nano_HeadersSymbol; SEXP nano_IdSymbol; SEXP nano_ListenerSymbol; -SEXP nano_PipeSymbol; +SEXP nano_MonitorSymbol; SEXP nano_ProtocolSymbol; SEXP nano_ResolveSymbol; SEXP nano_ResponseSymbol; @@ -72,7 +72,7 @@ static void RegisterSymbols(void) { nano_HeadersSymbol = Rf_install("headers"); nano_IdSymbol = Rf_install("id"); nano_ListenerSymbol = Rf_install("listener"); - nano_PipeSymbol = Rf_install("pipe"); + nano_MonitorSymbol = Rf_install("monitor"); nano_ProtocolSymbol = Rf_install("protocol"); nano_ResolveSymbol = Rf_install("resolve"); nano_ResponseSymbol = Rf_install("response"); @@ -160,6 +160,8 @@ static const R_CallMethodDef callMethods[] = { {"rnng_listener_close", (DL_FUNC) &rnng_listener_close, 1}, {"rnng_listener_start", (DL_FUNC) &rnng_listener_start, 1}, {"rnng_messenger", (DL_FUNC) &rnng_messenger, 1}, + {"rnng_monitor_create", (DL_FUNC) &rnng_monitor_create, 2}, + {"rnng_monitor_read", (DL_FUNC) &rnng_monitor_read, 1}, {"rnng_ncurl", (DL_FUNC) &rnng_ncurl, 9}, {"rnng_ncurl_aio", (DL_FUNC) &rnng_ncurl_aio, 9}, {"rnng_ncurl_session", (DL_FUNC) &rnng_ncurl_session, 8}, diff --git a/src/nanonext.h b/src/nanonext.h index 136ccec15..0fe2b8691 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -193,6 +193,13 @@ typedef struct nano_cv_duo_s { nano_cv *cv2; } nano_cv_duo; +typedef struct nano_monitor_s { + nano_cv *cv; + int *ids; + int size; + int updates; +} nano_monitor; + typedef struct nano_thread_aio_s { nng_thread *thr; nano_cv *cv; @@ -243,7 +250,7 @@ extern SEXP nano_DotcallSymbol; extern SEXP nano_HeadersSymbol; extern SEXP nano_IdSymbol; extern SEXP nano_ListenerSymbol; -extern SEXP nano_PipeSymbol; +extern SEXP nano_MonitorSymbol; extern SEXP nano_ProtocolSymbol; extern SEXP nano_ResolveSymbol; extern SEXP nano_ResponseSymbol; @@ -337,6 +344,8 @@ SEXP rnng_listener_close(SEXP); SEXP rnng_listener_start(SEXP); SEXP rnng_messenger(SEXP); SEXP rnng_messenger_thread_create(SEXP); +SEXP rnng_monitor_create(SEXP, SEXP); +SEXP rnng_monitor_read(SEXP); SEXP rnng_ncurl(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_ncurl_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_ncurl_session(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); diff --git a/src/sync.c b/src/sync.c index e4e842561..24fa3e02f 100644 --- a/src/sync.c +++ b/src/sync.c @@ -160,6 +160,31 @@ static void pipe_cb_dropcon(nng_pipe p, nng_pipe_ev ev, void *arg) { } +static void pipe_cb_monitor(nng_pipe p, nng_pipe_ev ev, void *arg) { + + nano_monitor *monitor = (nano_monitor *) arg; + + nano_cv *ncv = monitor->cv; + nng_cv *cv = ncv->cv; + nng_mtx *mtx = ncv->mtx; + + const int id = (int) p.id; + if (!id) + return; + + nng_mtx_lock(mtx); + if (monitor->updates >= monitor->size) { + monitor->size += 8; + monitor->ids = R_Realloc(monitor->ids, monitor->size, int); + } + monitor->ids[monitor->updates] = ev == NNG_PIPE_EV_ADD_POST ? id : -id; + monitor->updates++; + ncv->condition++; + nng_cv_wake(cv); + nng_mtx_unlock(mtx); + +} + // finalizers ------------------------------------------------------------------ static void cv_duo_finalizer(SEXP xptr) { @@ -184,6 +209,15 @@ static void request_finalizer(SEXP xptr) { } +static void monitor_finalizer(SEXP xptr) { + + if (NANO_PTR(xptr) == NULL) return; + nano_monitor *xp = (nano_monitor *) NANO_PTR(xptr); + R_Free(xp->ids); + R_Free(xp); + +} + // synchronization primitives -------------------------------------------------- SEXP rnng_cv_alloc(void) { @@ -639,3 +673,64 @@ SEXP rnng_interrupt_switch(void) { return R_NilValue; } + +// monitors -------------------------------------------------------------------- + +SEXP rnng_monitor_create(SEXP socket, SEXP cv) { + + if (NANO_TAG(socket) != nano_SocketSymbol) + Rf_error("'socket' is not a valid Socket"); + + if (NANO_TAG(cv) != nano_CvSymbol) + Rf_error("'cv' is not a valid Condition Variable"); + + const int n = 8; + nano_monitor *monitor = R_Calloc(1, nano_monitor); + monitor->ids = R_Calloc(n, int); + monitor->size = n; + monitor->cv = (nano_cv *) NANO_PTR(cv); + nng_socket *sock = (nng_socket *) NANO_PTR(socket); + + int xc; + + if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, pipe_cb_monitor, monitor))) + ERROR_OUT(xc); + + if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, pipe_cb_monitor, monitor))) + ERROR_OUT(xc); + + SEXP xptr = R_MakeExternalPtr(monitor, nano_MonitorSymbol, R_NilValue); + R_RegisterCFinalizerEx(xptr, monitor_finalizer, TRUE); + NANO_CLASS2(xptr, "nanoMonitor", "nano"); + Rf_setAttrib(xptr, nano_SocketSymbol, Rf_ScalarInteger(nng_socket_id(*sock))); + + return xptr; + +} + +SEXP rnng_monitor_read(SEXP x) { + + if (NANO_TAG(x) != nano_MonitorSymbol) + Rf_error("'x' is not a valid Monitor"); + + nano_monitor *monitor = (nano_monitor *) NANO_PTR(x); + + nano_cv *ncv = monitor->cv; + nng_mtx *mtx = ncv->mtx; + + SEXP out; + nng_mtx_lock(mtx); + const int updates = monitor->updates; + if (updates) { + out = Rf_allocVector(INTSXP, updates); + memcpy(NANO_DATAPTR(out), monitor->ids, updates * sizeof(int)); + monitor->updates = 0; + } + nng_mtx_unlock(mtx); + + if (!updates) + out = R_NilValue; + + return out; + +} diff --git a/tests/tests.R b/tests/tests.R index f551f715f..cf5153131 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -364,11 +364,24 @@ test_true(!wait(cv)) test_true(!wait(cv2)) test_class("errorValue", resp$recv()) +test_class("nanoSocket", poly <- socket(protocol = "poly")) +test_class("nanoSocket", poly1 <- socket(protocol = "poly")) +test_class("nanoSocket", poly2 <- socket(protocol = "poly")) +test_class("nanoMonitor", m <- monitor(poly, cv)) +test_zero(listen(poly)) +test_null(read_monitor(m)) +test_zero(dial(poly1)) +test_zero(dial(poly2)) +test_zero(reap(poly2)) +test_zero(reap(poly1)) +test_true(!wait(cv)) +test_type("integer", read_monitor(m)) +test_zero(reap(poly)) + test_class("nanoSocket", bus <- socket(protocol = "bus")) test_class("nanoSocket", push <- socket(protocol = "push")) test_class("nanoSocket", pull <- socket(protocol = "pull")) test_class("nanoSocket", pair <- socket(protocol = "pair")) -test_class("nanoSocket", poly <- socket(protocol = "poly")) test_class("nano", bus) test_equal(suppressWarnings(listen(bus, url = "test")), 3L) test_equal(suppressWarnings(dial(bus, url = "test")), 3L) @@ -379,7 +392,6 @@ test_equal(suppressWarnings(close(bus)), 7L) test_zero(close(push)) test_zero(close(pull)) test_zero(reap(pair)) -test_zero(reap(poly)) test_error(socket(protocol = "newprotocol"), "protocol") test_error(socket(dial = "test"), "argument") test_error(socket(listen = "test"), "argument")