Skip to content

Commit

Permalink
Limit scope of interrupt monitoring for recv_aio() (#68)
Browse files Browse the repository at this point in the history
* limit activating interrupt monitoring for recv_aio()

* increment dev version
  • Loading branch information
shikokuchuo authored Dec 2, 2024
1 parent 1ccab16 commit 9af0651
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 6 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.3.2.9012
Version: 1.3.2.9013
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# nanonext 1.3.2.9012 (development)
# nanonext 1.3.2.9013 (development)

#### New Features

Expand Down
29 changes: 27 additions & 2 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ static void isaio_complete(void *arg) {

}

static void raio_complete(void *arg) {

nano_aio *raio = (nano_aio *) arg;
int res = nng_aio_result(raio->aio);
if (res == 0) {
nng_msg *msg = nng_aio_get_msg(raio->aio);
raio->data = msg;
nng_pipe p = nng_msg_get_pipe(msg);
res = - (int) p.id;
}

raio->result = res;

if (raio->cb != NULL)
later2(raio_invoke_cb, raio->cb);

}

static void iraio_complete(void *arg) {

nano_aio *iaio = (nano_aio *) arg;
Expand Down Expand Up @@ -500,7 +518,14 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP cvar, SEXP bytes, SEXP clo) {

const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) nano_integer(timeout);
const int signal = NANO_TAG(cvar) == nano_CvSymbol;
int signal, interrupt;
if (cvar == R_NilValue) {
signal = 0;
interrupt = 0;
} else {
signal = NANO_TAG(cvar) == nano_CvSymbol;
interrupt = 1 - signal;
}
nano_cv *ncv = signal ? (nano_cv *) NANO_PTR(cvar) : NULL;
nano_aio *raio;
SEXP aio, env, fun;
Expand All @@ -516,7 +541,7 @@ SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP cvar, SEXP bytes, SEX
raio->mode = mod;
raio->cb = NULL;

if ((xc = nng_aio_alloc(&raio->aio, signal ? raio_complete_signal : raio_complete, raio)))
if ((xc = nng_aio_alloc(&raio->aio, signal ? raio_complete_signal : interrupt ? raio_complete_interrupt : raio_complete, raio)))
goto exitlevel1;

nng_aio_set_timeout(raio->aio, dur);
Expand Down
2 changes: 1 addition & 1 deletion src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ static SEXP nano_outHook(SEXP x, SEXP fun) {

// functions with forward definitions in nanonext.h ----------------------------

void raio_complete(void *arg) {
void raio_complete_interrupt(void *arg) {

nano_aio *raio = (nano_aio *) arg;
int res = nng_aio_result(raio->aio);
Expand Down
2 changes: 1 addition & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ SEXP R_mkClosure(SEXP, SEXP, SEXP);
SEXP nano_findVarInFrame(const SEXP, const SEXP);
SEXP nano_PreserveObject(const SEXP);
void nano_ReleaseObject(SEXP);
void raio_complete(void *);
void raio_complete_interrupt(void *);
void raio_complete_signal(void *);
void sendaio_complete(void *);
void cv_finalizer(SEXP);
Expand Down

0 comments on commit 9af0651

Please sign in to comment.