Skip to content

Commit

Permalink
recvAios now record the pipe ID at $aio upon resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 28, 2024
1 parent 2b48cd0 commit 8eeb6e4
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 26 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.3.2.9003
Version: 1.3.2.9004
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# nanonext 1.3.2.9003 (development)
# nanonext 1.3.2.9004 (development)

#### New Features

* `opt()` gains the ability to retrieve options from a Pipe.
* A 'recvAio' now records the integer pipe ID, where successful, at `$aio` upon resolution.

# nanonext 1.3.2

Expand Down
20 changes: 12 additions & 8 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ static void isaio_complete(void *arg) {
static void raio_complete(void *arg) {

nano_aio *raio = (nano_aio *) arg;
const int res = nng_aio_result(raio->aio);
if (res == 0)
raio->data = nng_aio_get_msg(raio->aio);
int res = nng_aio_result(raio->aio);
if (res == 0) {
nng_msg *msg = nng_aio_get_msg(raio->aio);
raio->data = msg;
res = -nng_msg_get_pipe(msg).id;
}

raio->result = res - !res;
raio->result = res;

if (raio->cb != NULL)
later2(raio_invoke_cb, raio->cb);
Expand Down Expand Up @@ -197,7 +200,7 @@ SEXP rnng_aio_get_msg(SEXP env) {
break;
}

SEXP out;
SEXP out, result;
unsigned char *buf;
size_t sz;

Expand All @@ -211,10 +214,11 @@ SEXP rnng_aio_get_msg(SEXP env) {
}

PROTECT(out = nano_decode(buf, sz, raio->mode, NANO_PROT(aio)));
PROTECT(result = Rf_ScalarInteger(-res));
Rf_defineVar(nano_ValueSymbol, out, env);
Rf_defineVar(nano_AioSymbol, R_NilValue, env);
Rf_defineVar(nano_AioSymbol, result, env);

UNPROTECT(1);
UNPROTECT(2);
return out;

}
Expand Down Expand Up @@ -348,7 +352,7 @@ static int rnng_unresolved_impl(SEXP x) {
break;
default:
value = rnng_aio_get_msg(x);
break;
break;
}
xc = value == nano_unresolved;
break;
Expand Down
11 changes: 7 additions & 4 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,15 @@ void raio_complete_signal(void *arg) {
nng_cv *cv = ncv->cv;
nng_mtx *mtx = ncv->mtx;

const int res = nng_aio_result(raio->aio);
if (res == 0)
raio->data = nng_aio_get_msg(raio->aio);
int res = nng_aio_result(raio->aio);
if (res == 0) {
nng_msg *msg = nng_aio_get_msg(raio->aio);
raio->data = msg;
res = -nng_msg_get_pipe(msg).id;
}

nng_mtx_lock(mtx);
raio->result = res - !res;
raio->result = res;
ncv->condition++;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);
Expand Down
31 changes: 19 additions & 12 deletions src/sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
static void request_complete(void *arg) {

nano_aio *raio = (nano_aio *) arg;
const int res = nng_aio_result(raio->aio);
if (res == 0)
raio->data = nng_aio_get_msg(raio->aio);
raio->result = res - !res;
int res = nng_aio_result(raio->aio);
if (res == 0) {
nng_msg *msg = nng_aio_get_msg(raio->aio);
raio->data = msg;
res = -nng_msg_get_pipe(msg).id;
}
raio->result = res;

nano_saio *saio = (nano_saio *) raio->cb;
if (saio->cb != NULL)
Expand All @@ -38,14 +41,15 @@ static void request_complete(void *arg) {
static void request_complete_dropcon(void *arg) {

nano_aio *raio = (nano_aio *) arg;
const int res = nng_aio_result(raio->aio);
int res = nng_aio_result(raio->aio);
if (res == 0) {
nng_msg *msg = nng_aio_get_msg(raio->aio);
raio->data = msg;
nng_pipe_close(nng_msg_get_pipe(msg));
nng_pipe p = nng_msg_get_pipe(msg);
res = -p.id;
nng_pipe_close(p);
}

raio->result = res - !res;
raio->result = res;

nano_saio *saio = (nano_saio *) raio->cb;
if (saio->cb != NULL)
Expand All @@ -60,12 +64,15 @@ static void request_complete_signal(void *arg) {
nng_cv *cv = ncv->cv;
nng_mtx *mtx = ncv->mtx;

const int res = nng_aio_result(raio->aio);
if (res == 0)
raio->data = nng_aio_get_msg(raio->aio);
int res = nng_aio_result(raio->aio);
if (res == 0) {
nng_msg *msg = nng_aio_get_msg(raio->aio);
raio->data = msg;
res = -nng_msg_get_pipe(msg).id;
}

nng_mtx_lock(mtx);
raio->result = res - !res;
raio->result = res;
ncv->condition++;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);
Expand Down
2 changes: 2 additions & 0 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ test_type("complex", call_aio(rek)[["data"]])
test_class("recvAio", rek <- request(req$context, c(1+3i, 4+2i), send_mode = "serial", recv_mode = "serial", timeout = 500))
test_zero(reply(ctx, execute = identity, recv_mode = 1L, send_mode = 1L, timeout = 500))
test_type("complex", call_aio(rek)[["data"]])
test_type("integer", rek[["aio"]])

test_type("list", cfg <- serial_config(class = "custom", sfunc = function(x) raw(1L), ufunc = as.integer, vec = FALSE))
test_equal(length(cfg), 4L)
Expand Down Expand Up @@ -289,6 +290,7 @@ test_notnull(cs$data)
test_type("externalptr", ctxn <- .context(rep))
test_class("recvAio", cr <- recv_aio(ctxn, cv = cv, timeout = 500))
test_equal(call_aio(cr)$data, "test")
test_type("integer", cr$aio)
test_type("integer", send(ctxn, TRUE, mode = 0L, block = FALSE))
test_type("externalptr", ctxn <- .context(rep))
test_class("recvAio", cs <- request(.context(req$socket), data = TRUE, cv = NA))
Expand Down

0 comments on commit 8eeb6e4

Please sign in to comment.