Skip to content

Commit

Permalink
Monitor a Socket for Connection Changes (#66)
Browse files Browse the repository at this point in the history
* monitor concept

* return NULL if no changes

* make nanoMonitor object; add tests

* docs and tests
  • Loading branch information
shikokuchuo authored Nov 29, 2024
1 parent a235c88 commit df45d62
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 9 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.3.2.9006
Version: 1.3.2.9007
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
3 changes: 3 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ S3method(print,errorValue)
S3method(print,nanoContext)
S3method(print,nanoDialer)
S3method(print,nanoListener)
S3method(print,nanoMonitor)
S3method(print,nanoObject)
S3method(print,nanoSocket)
S3method(print,nanoStream)
Expand Down Expand Up @@ -70,6 +71,7 @@ export(listen)
export(lock)
export(mclock)
export(messenger)
export(monitor)
export(msleep)
export(nano)
export(ncurl)
Expand All @@ -81,6 +83,7 @@ export(opt)
export(parse_url)
export(pipe_notify)
export(random)
export(read_monitor)
export(reap)
export(recv)
export(recv_aio)
Expand Down
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# nanonext 1.3.2.9006 (development)
# nanonext 1.3.2.9007 (development)

#### New Features

* New interface to Pipes moves to using integer pipe IDs rather than Pipe (external pointer) objects:
+ `send_aio()` gains the argument 'pipe' which accepts an integer pipe ID for directed sends (currently only supported by Sockets using the 'poly' protocol).
+ A 'recvAio' now records the integer pipe ID, where successful, at `$aio` upon resolution.
+ Pipe objects (of class 'nanoPipe') are obsoleted.
* Adds `monitor()` and `read_monitor()` for easy monitoring of connection changes (pipe additons and removals) at a Socket.

#### Updates

Expand Down
9 changes: 9 additions & 0 deletions R/nano.R
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,15 @@ print.nanoStream <- function(x, ...) {

}

#' @export
#'
print.nanoMonitor <- function(x, ...) {

cat(sprintf("< nanoMonitor >\n - socket: %s\n", attr(x, "socket")), file = stdout())
invisible(x)

}

#' @export
#'
print.recvAio <- function(x, ...) {
Expand Down
42 changes: 42 additions & 0 deletions R/socket.R
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,45 @@ close.nanoSocket <- function(con, ...) invisible(.Call(rnng_close, con))
#' @export
#'
reap <- function(con) .Call(rnng_reap, con)

#' Monitor a Socket for Pipe Changes
#'
#' This function monitors pipe additions and removals from a socket.
#'
#' @param sock a Socket.
#' @param cv a conditionVariable.
#'
#' @return For \code{monitor}: a Monitor (object of class 'nanoMonitor'). \cr
#' For \code{read_monitor}: an integer vector of pipe IDs (positive if added,
#' negative if removed), or else NULL if there were no changes since the
#' previous read.
#'
#' @examples
#' cv <- cv()
#' s <- socket("poly")
#' s1 <- socket("poly")
#'
#' m <- monitor(s, cv)
#' m
#'
#' listen(s)
#' dial(s1)
#'
#' cv_value(cv)
#' read_monitor(m)
#'
#' close(s)
#' close(s1)
#'
#' read_monitor(m)
#'
#' @export
#'
monitor <- function(sock, cv) .Call(rnng_monitor_create, sock, cv)

#' @param x an external pointer to a monitor.
#'
#' @rdname monitor
#' @export
#'
read_monitor <- function(x) .Call(rnng_monitor_read, x)
3 changes: 2 additions & 1 deletion R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ parse_url <- function(url) .Call(rnng_url_parse, url)
#' \sQuote{recvAio}).
#'
#' Is the object an object inheriting from class \sQuote{nano} i.e. a
#' nanoSocket, nanoContext, nanoStream, nanoListener, nanoDialer or nano Object.
#' nanoSocket, nanoContext, nanoStream, nanoListener, nanoDialer, nanoMonitor or
#' nano Object.
#'
#' Is the object an ncurlSession (object of class \sQuote{ncurlSession}).
#'
Expand Down
3 changes: 2 additions & 1 deletion man/is_aio.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions man/monitor.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ SEXP nano_DotcallSymbol;
SEXP nano_HeadersSymbol;
SEXP nano_IdSymbol;
SEXP nano_ListenerSymbol;
SEXP nano_PipeSymbol;
SEXP nano_MonitorSymbol;
SEXP nano_ProtocolSymbol;
SEXP nano_ResolveSymbol;
SEXP nano_ResponseSymbol;
Expand Down Expand Up @@ -72,7 +72,7 @@ static void RegisterSymbols(void) {
nano_HeadersSymbol = Rf_install("headers");
nano_IdSymbol = Rf_install("id");
nano_ListenerSymbol = Rf_install("listener");
nano_PipeSymbol = Rf_install("pipe");
nano_MonitorSymbol = Rf_install("monitor");
nano_ProtocolSymbol = Rf_install("protocol");
nano_ResolveSymbol = Rf_install("resolve");
nano_ResponseSymbol = Rf_install("response");
Expand Down Expand Up @@ -160,6 +160,8 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_listener_close", (DL_FUNC) &rnng_listener_close, 1},
{"rnng_listener_start", (DL_FUNC) &rnng_listener_start, 1},
{"rnng_messenger", (DL_FUNC) &rnng_messenger, 1},
{"rnng_monitor_create", (DL_FUNC) &rnng_monitor_create, 2},
{"rnng_monitor_read", (DL_FUNC) &rnng_monitor_read, 1},
{"rnng_ncurl", (DL_FUNC) &rnng_ncurl, 9},
{"rnng_ncurl_aio", (DL_FUNC) &rnng_ncurl_aio, 9},
{"rnng_ncurl_session", (DL_FUNC) &rnng_ncurl_session, 8},
Expand Down
11 changes: 10 additions & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ typedef struct nano_cv_duo_s {
nano_cv *cv2;
} nano_cv_duo;

typedef struct nano_monitor_s {
nano_cv *cv;
int *ids;
int size;
int updates;
} nano_monitor;

typedef struct nano_thread_aio_s {
nng_thread *thr;
nano_cv *cv;
Expand Down Expand Up @@ -243,7 +250,7 @@ extern SEXP nano_DotcallSymbol;
extern SEXP nano_HeadersSymbol;
extern SEXP nano_IdSymbol;
extern SEXP nano_ListenerSymbol;
extern SEXP nano_PipeSymbol;
extern SEXP nano_MonitorSymbol;
extern SEXP nano_ProtocolSymbol;
extern SEXP nano_ResolveSymbol;
extern SEXP nano_ResponseSymbol;
Expand Down Expand Up @@ -337,6 +344,8 @@ SEXP rnng_listener_close(SEXP);
SEXP rnng_listener_start(SEXP);
SEXP rnng_messenger(SEXP);
SEXP rnng_messenger_thread_create(SEXP);
SEXP rnng_monitor_create(SEXP, SEXP);
SEXP rnng_monitor_read(SEXP);
SEXP rnng_ncurl(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_ncurl_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_ncurl_session(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
Expand Down
95 changes: 95 additions & 0 deletions src/sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,31 @@ static void pipe_cb_dropcon(nng_pipe p, nng_pipe_ev ev, void *arg) {

}

static void pipe_cb_monitor(nng_pipe p, nng_pipe_ev ev, void *arg) {

nano_monitor *monitor = (nano_monitor *) arg;

nano_cv *ncv = monitor->cv;
nng_cv *cv = ncv->cv;
nng_mtx *mtx = ncv->mtx;

const int id = (int) p.id;
if (!id)
return;

nng_mtx_lock(mtx);
if (monitor->updates >= monitor->size) {
monitor->size += 8;
monitor->ids = R_Realloc(monitor->ids, monitor->size, int);
}
monitor->ids[monitor->updates] = ev == NNG_PIPE_EV_ADD_POST ? id : -id;
monitor->updates++;
ncv->condition++;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);

}

// finalizers ------------------------------------------------------------------

static void cv_duo_finalizer(SEXP xptr) {
Expand All @@ -184,6 +209,15 @@ static void request_finalizer(SEXP xptr) {

}

static void monitor_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nano_monitor *xp = (nano_monitor *) NANO_PTR(xptr);
R_Free(xp->ids);
R_Free(xp);

}

// synchronization primitives --------------------------------------------------

SEXP rnng_cv_alloc(void) {
Expand Down Expand Up @@ -639,3 +673,64 @@ SEXP rnng_interrupt_switch(void) {
return R_NilValue;

}

// monitors --------------------------------------------------------------------

SEXP rnng_monitor_create(SEXP socket, SEXP cv) {

if (NANO_TAG(socket) != nano_SocketSymbol)
Rf_error("'socket' is not a valid Socket");

if (NANO_TAG(cv) != nano_CvSymbol)
Rf_error("'cv' is not a valid Condition Variable");

const int n = 8;
nano_monitor *monitor = R_Calloc(1, nano_monitor);
monitor->ids = R_Calloc(n, int);
monitor->size = n;
monitor->cv = (nano_cv *) NANO_PTR(cv);
nng_socket *sock = (nng_socket *) NANO_PTR(socket);

int xc;

if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, pipe_cb_monitor, monitor)))
ERROR_OUT(xc);

if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, pipe_cb_monitor, monitor)))
ERROR_OUT(xc);

SEXP xptr = R_MakeExternalPtr(monitor, nano_MonitorSymbol, R_NilValue);
R_RegisterCFinalizerEx(xptr, monitor_finalizer, TRUE);
NANO_CLASS2(xptr, "nanoMonitor", "nano");
Rf_setAttrib(xptr, nano_SocketSymbol, Rf_ScalarInteger(nng_socket_id(*sock)));

return xptr;

}

SEXP rnng_monitor_read(SEXP x) {

if (NANO_TAG(x) != nano_MonitorSymbol)
Rf_error("'x' is not a valid Monitor");

nano_monitor *monitor = (nano_monitor *) NANO_PTR(x);

nano_cv *ncv = monitor->cv;
nng_mtx *mtx = ncv->mtx;

SEXP out;
nng_mtx_lock(mtx);
const int updates = monitor->updates;
if (updates) {
out = Rf_allocVector(INTSXP, updates);
memcpy(NANO_DATAPTR(out), monitor->ids, updates * sizeof(int));
monitor->updates = 0;
}
nng_mtx_unlock(mtx);

if (!updates)
out = R_NilValue;

return out;

}
16 changes: 14 additions & 2 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,24 @@ test_true(!wait(cv))
test_true(!wait(cv2))
test_class("errorValue", resp$recv())

test_class("nanoSocket", poly <- socket(protocol = "poly"))
test_class("nanoSocket", poly1 <- socket(protocol = "poly"))
test_class("nanoSocket", poly2 <- socket(protocol = "poly"))
test_class("nanoMonitor", m <- monitor(poly, cv))
test_zero(listen(poly))
test_null(read_monitor(m))
test_zero(dial(poly1))
test_zero(dial(poly2))
test_zero(reap(poly2))
test_zero(reap(poly1))
test_true(!wait(cv))
test_type("integer", read_monitor(m))
test_zero(reap(poly))

test_class("nanoSocket", bus <- socket(protocol = "bus"))
test_class("nanoSocket", push <- socket(protocol = "push"))
test_class("nanoSocket", pull <- socket(protocol = "pull"))
test_class("nanoSocket", pair <- socket(protocol = "pair"))
test_class("nanoSocket", poly <- socket(protocol = "poly"))
test_class("nano", bus)
test_equal(suppressWarnings(listen(bus, url = "test")), 3L)
test_equal(suppressWarnings(dial(bus, url = "test")), 3L)
Expand All @@ -379,7 +392,6 @@ test_equal(suppressWarnings(close(bus)), 7L)
test_zero(close(push))
test_zero(close(pull))
test_zero(reap(pair))
test_zero(reap(poly))
test_error(socket(protocol = "newprotocol"), "protocol")
test_error(socket(dial = "test"), "argument")
test_error(socket(listen = "test"), "argument")
Expand Down

0 comments on commit df45d62

Please sign in to comment.