From 8eeb6e46d1b0aeb4036fc5094fb6b71f00acd264 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Thu, 28 Nov 2024 22:35:07 +0000 Subject: [PATCH] recvAios now record the pipe ID at $aio upon resolution --- DESCRIPTION | 2 +- NEWS.md | 3 ++- src/aio.c | 20 ++++++++++++-------- src/core.c | 11 +++++++---- src/sync.c | 31 +++++++++++++++++++------------ tests/tests.R | 2 ++ 6 files changed, 43 insertions(+), 26 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index f827e56b2..8fafb9f76 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: nanonext Type: Package Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library -Version: 1.3.2.9003 +Version: 1.3.2.9004 Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is a socket library implementing 'Scalability Protocols', a reliable, high-performance standard for common communications patterns including diff --git a/NEWS.md b/NEWS.md index 333923251..44af3f8e6 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,8 +1,9 @@ -# nanonext 1.3.2.9003 (development) +# nanonext 1.3.2.9004 (development) #### New Features * `opt()` gains the ability to retrieve options from a Pipe. +* A 'recvAio' now records the integer pipe ID, where successful, at `$aio` upon resolution. # nanonext 1.3.2 diff --git a/src/aio.c b/src/aio.c index 48f260f11..57546bc4a 100644 --- a/src/aio.c +++ b/src/aio.c @@ -56,11 +56,14 @@ static void isaio_complete(void *arg) { static void raio_complete(void *arg) { nano_aio *raio = (nano_aio *) arg; - const int res = nng_aio_result(raio->aio); - if (res == 0) - raio->data = nng_aio_get_msg(raio->aio); + int res = nng_aio_result(raio->aio); + if (res == 0) { + nng_msg *msg = nng_aio_get_msg(raio->aio); + raio->data = msg; + res = -nng_msg_get_pipe(msg).id; + } - raio->result = res - !res; + raio->result = res; if (raio->cb != NULL) later2(raio_invoke_cb, raio->cb); @@ -197,7 +200,7 @@ SEXP rnng_aio_get_msg(SEXP env) { break; } - SEXP out; + SEXP out, result; unsigned char *buf; size_t sz; @@ -211,10 +214,11 @@ SEXP rnng_aio_get_msg(SEXP env) { } PROTECT(out = nano_decode(buf, sz, raio->mode, NANO_PROT(aio))); + PROTECT(result = Rf_ScalarInteger(-res)); Rf_defineVar(nano_ValueSymbol, out, env); - Rf_defineVar(nano_AioSymbol, R_NilValue, env); + Rf_defineVar(nano_AioSymbol, result, env); - UNPROTECT(1); + UNPROTECT(2); return out; } @@ -348,7 +352,7 @@ static int rnng_unresolved_impl(SEXP x) { break; default: value = rnng_aio_get_msg(x); - break; + break; } xc = value == nano_unresolved; break; diff --git a/src/core.c b/src/core.c index da9a43b89..df3e4500e 100644 --- a/src/core.c +++ b/src/core.c @@ -130,12 +130,15 @@ void raio_complete_signal(void *arg) { nng_cv *cv = ncv->cv; nng_mtx *mtx = ncv->mtx; - const int res = nng_aio_result(raio->aio); - if (res == 0) - raio->data = nng_aio_get_msg(raio->aio); + int res = nng_aio_result(raio->aio); + if (res == 0) { + nng_msg *msg = nng_aio_get_msg(raio->aio); + raio->data = msg; + res = -nng_msg_get_pipe(msg).id; + } nng_mtx_lock(mtx); - raio->result = res - !res; + raio->result = res; ncv->condition++; nng_cv_wake(cv); nng_mtx_unlock(mtx); diff --git a/src/sync.c b/src/sync.c index c314f22de..608958c4c 100644 --- a/src/sync.c +++ b/src/sync.c @@ -24,10 +24,13 @@ static void request_complete(void *arg) { nano_aio *raio = (nano_aio *) arg; - const int res = nng_aio_result(raio->aio); - if (res == 0) - raio->data = nng_aio_get_msg(raio->aio); - raio->result = res - !res; + int res = nng_aio_result(raio->aio); + if (res == 0) { + nng_msg *msg = nng_aio_get_msg(raio->aio); + raio->data = msg; + res = -nng_msg_get_pipe(msg).id; + } + raio->result = res; nano_saio *saio = (nano_saio *) raio->cb; if (saio->cb != NULL) @@ -38,14 +41,15 @@ static void request_complete(void *arg) { static void request_complete_dropcon(void *arg) { nano_aio *raio = (nano_aio *) arg; - const int res = nng_aio_result(raio->aio); + int res = nng_aio_result(raio->aio); if (res == 0) { nng_msg *msg = nng_aio_get_msg(raio->aio); raio->data = msg; - nng_pipe_close(nng_msg_get_pipe(msg)); + nng_pipe p = nng_msg_get_pipe(msg); + res = -p.id; + nng_pipe_close(p); } - - raio->result = res - !res; + raio->result = res; nano_saio *saio = (nano_saio *) raio->cb; if (saio->cb != NULL) @@ -60,12 +64,15 @@ static void request_complete_signal(void *arg) { nng_cv *cv = ncv->cv; nng_mtx *mtx = ncv->mtx; - const int res = nng_aio_result(raio->aio); - if (res == 0) - raio->data = nng_aio_get_msg(raio->aio); + int res = nng_aio_result(raio->aio); + if (res == 0) { + nng_msg *msg = nng_aio_get_msg(raio->aio); + raio->data = msg; + res = -nng_msg_get_pipe(msg).id; + } nng_mtx_lock(mtx); - raio->result = res - !res; + raio->result = res; ncv->condition++; nng_cv_wake(cv); nng_mtx_unlock(mtx); diff --git a/tests/tests.R b/tests/tests.R index 4a482dd58..c6067a752 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -257,6 +257,7 @@ test_type("complex", call_aio(rek)[["data"]]) test_class("recvAio", rek <- request(req$context, c(1+3i, 4+2i), send_mode = "serial", recv_mode = "serial", timeout = 500)) test_zero(reply(ctx, execute = identity, recv_mode = 1L, send_mode = 1L, timeout = 500)) test_type("complex", call_aio(rek)[["data"]]) +test_type("integer", rek[["aio"]]) test_type("list", cfg <- serial_config(class = "custom", sfunc = function(x) raw(1L), ufunc = as.integer, vec = FALSE)) test_equal(length(cfg), 4L) @@ -289,6 +290,7 @@ test_notnull(cs$data) test_type("externalptr", ctxn <- .context(rep)) test_class("recvAio", cr <- recv_aio(ctxn, cv = cv, timeout = 500)) test_equal(call_aio(cr)$data, "test") +test_type("integer", cr$aio) test_type("integer", send(ctxn, TRUE, mode = 0L, block = FALSE)) test_type("externalptr", ctxn <- .context(rep)) test_class("recvAio", cs <- request(.context(req$socket), data = TRUE, cv = NA))