Skip to content

Commit

Permalink
Restore persistent wait thread (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo authored Nov 13, 2024
1 parent 69185fc commit c04c5d9
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 64 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.3.0.9009
Version: 1.3.0.9010
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# nanonext 1.3.0.9009 (development)
# nanonext 1.3.0.9010 (development)

#### Updates

* Performs interruptible 'aio' waits using a single dedicated thread, rather than launching new threads, for higher performance and efficiency.
* Performance enhancements for 'ncurlAio' and 'recvAio' promises methods.
* Updates bundled 'libnng' to v1.9.0 stable release.
* The package has a shiny new hex logo.
Expand Down
1 change: 1 addition & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ serial_config <- function(class, sfunc, ufunc, vec = FALSE)
#' if (Sys.info()[["sysname"]] == "Linux") {
#' rm(list = ls())
#' gc()
#' .Call(nanonext:::rnng_thread_shutdown)
#' Sys.sleep(1L)
#' .Call(nanonext:::rnng_fini)
#' }
Expand Down
1 change: 1 addition & 0 deletions man/zzz.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ void (*eln2)(void (*)(void *), void *, double, int) = NULL;

uint8_t special_bit = 0;

extern nng_thread *nano_wait_thr;
extern nng_aio *nano_shared_aio;
extern nng_mtx *nano_wait_mtx;
extern nng_cv *nano_wait_cv;
extern int nano_wait_condition;

SEXP nano_AioSymbol;
SEXP nano_ContextSymbol;
SEXP nano_CvSymbol;
Expand Down Expand Up @@ -184,8 +190,9 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_stream_listen", (DL_FUNC) &rnng_stream_listen, 3},
{"rnng_strerror", (DL_FUNC) &rnng_strerror, 1},
{"rnng_subscribe", (DL_FUNC) &rnng_subscribe, 3},
{"rnng_traverse_precious", (DL_FUNC) &rnng_traverse_precious, 0},
{"rnng_thread_shutdown", (DL_FUNC) &rnng_thread_shutdown, 0},
{"rnng_tls_config", (DL_FUNC) &rnng_tls_config, 4},
{"rnng_traverse_precious", (DL_FUNC) &rnng_traverse_precious, 0},
{"rnng_unresolved", (DL_FUNC) &rnng_unresolved, 1},
{"rnng_unresolved2", (DL_FUNC) &rnng_unresolved2, 1},
{"rnng_url_parse", (DL_FUNC) &rnng_url_parse, 1},
Expand All @@ -210,6 +217,7 @@ void attribute_visible R_init_nanonext(DllInfo* dll) {

// # nocov start
void attribute_visible R_unload_nanonext(DllInfo *info) {
rnng_thread_shutdown();
ReleaseObjects();
}
// # nocov end
3 changes: 2 additions & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,9 @@ SEXP rnng_stream_dial(SEXP, SEXP, SEXP);
SEXP rnng_stream_listen(SEXP, SEXP, SEXP);
SEXP rnng_strerror(SEXP);
SEXP rnng_subscribe(SEXP, SEXP, SEXP);
SEXP rnng_traverse_precious(void);
SEXP rnng_thread_shutdown(void);
SEXP rnng_tls_config(SEXP, SEXP, SEXP, SEXP);
SEXP rnng_traverse_precious(void);
SEXP rnng_unresolved(SEXP);
SEXP rnng_unresolved2(SEXP);
SEXP rnng_url_parse(SEXP);
Expand Down
218 changes: 158 additions & 60 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@

// threads callable and messenger ----------------------------------------------

nng_thread *nano_wait_thr = NULL;
nng_aio *nano_shared_aio = NULL;
nng_mtx *nano_wait_mtx = NULL;
nng_cv *nano_wait_cv = NULL;
int nano_wait_condition = 0;

// # nocov start
// tested interactively

Expand Down Expand Up @@ -136,18 +142,18 @@ SEXP rnng_messenger(SEXP url) {
SEXP socket, con;

if ((xc = nng_pair0_open(sock)))
goto exitlevel1;
goto fail;
lp = R_Calloc(1, nng_listener);
if ((xc = nng_listen(*sock, up, lp, 0))) {
if (xc != 10 && xc != 15) {
R_Free(lp);
goto exitlevel1;
goto fail;
}
R_Free(lp);
dp = R_Calloc(1, nng_dialer);
if ((xc = nng_dial(*sock, up, dp, 0))) {
R_Free(dp);
goto exitlevel1;
goto fail;
}
dialer = 1;
}
Expand All @@ -166,7 +172,7 @@ SEXP rnng_messenger(SEXP url) {
UNPROTECT(2);
return socket;

exitlevel1:
fail:
R_Free(sock);
ERROR_OUT(xc);

Expand Down Expand Up @@ -219,22 +225,6 @@ static void thread_duo_finalizer(SEXP xptr) {

}

static void rnng_wait_thread(void *args) {

nano_thread_aio *taio = (nano_thread_aio *) args;
nano_cv *ncv = taio->cv;
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;

nng_aio_wait(taio->aio);

nng_mtx_lock(mtx);
ncv->condition = 1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);

}

static void thread_disp_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
Expand Down Expand Up @@ -271,6 +261,100 @@ static void thread_disp_finalizer(SEXP xptr) {

}

static void rnng_wait_thread(void *args) {

while (1) {
nng_mtx_lock(nano_wait_mtx);
while (nano_wait_condition == 0)
nng_cv_wait(nano_wait_cv);
if (nano_wait_condition == -1) {
nng_mtx_unlock(nano_wait_mtx);
break;
}
nng_mtx_unlock(nano_wait_mtx);

nng_aio_wait(nano_shared_aio);

nng_mtx_lock(nano_wait_mtx);
nano_shared_aio = NULL;
nano_wait_condition = 0;
nng_cv_wake(nano_wait_cv);
nng_mtx_unlock(nano_wait_mtx);
}

}

static void rnng_wait_thread_single(void *args) {

nano_thread_aio *taio = (nano_thread_aio *) args;
nano_cv *ncv = taio->cv;
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;

nng_aio_wait(taio->aio);

nng_mtx_lock(mtx);
ncv->condition = 1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);

}

void single_wait_thread_create(SEXP x) {

nano_aio *aiop = (nano_aio *) NANO_PTR(x);
nano_thread_aio *taio = R_Calloc(1, nano_thread_aio);
nano_cv *ncv = R_Calloc(1, nano_cv);
taio->aio = aiop->aio;
taio->cv = ncv;
nng_mtx *mtx = NULL;
nng_cv *cv = NULL;
int xc, signalled;

if ((xc = nng_mtx_alloc(&mtx)))
goto fail;

if ((xc = nng_cv_alloc(&cv, mtx)))
goto fail;

ncv->mtx = mtx;
ncv->cv = cv;

if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio)))
goto fail;

SEXP xptr;
PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue));
R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE);
R_MakeWeakRef(x, xptr, R_NilValue, TRUE);
UNPROTECT(1);

nng_time time = nng_clock();

while (1) {
time = time + 400;
signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
}
}
nng_mtx_unlock(mtx);
if (signalled) break;
R_CheckUserInterrupt();
}

return;

fail:
if (cv) nng_cv_free(cv);
if (mtx) nng_mtx_free(mtx);
ERROR_OUT(xc);

}

SEXP rnng_wait_thread_create(SEXP x) {

const SEXPTYPE typ = TYPEOF(x);
Expand All @@ -280,51 +364,53 @@ SEXP rnng_wait_thread_create(SEXP x) {
if (NANO_TAG(coreaio) != nano_AioSymbol)
return x;

PROTECT(coreaio);
nano_aio *aiop = (nano_aio *) NANO_PTR(coreaio);

nano_thread_aio *taio = R_Calloc(1, nano_thread_aio);
nano_cv *ncv = R_Calloc(1, nano_cv);
taio->aio = aiop->aio;
taio->cv = ncv;
nng_mtx *mtx;
nng_cv *cv;

int xc, signalled;

if ((xc = nng_mtx_alloc(&mtx)))
goto exitlevel1;
if (!nano_wait_thr) {
if ((xc = nng_mtx_alloc(&nano_wait_mtx)) ||
(xc = nng_cv_alloc(&nano_wait_cv, nano_wait_mtx)) ||
(xc = nng_thread_create(&nano_wait_thr, rnng_wait_thread, NULL)))
goto fail;
}

if ((xc = nng_cv_alloc(&cv, mtx)))
goto exitlevel2;
int thread_required = 0;
nng_mtx_lock(nano_wait_mtx);
if (nano_wait_condition == 0) {
nano_shared_aio = aiop->aio;
nano_wait_condition = 1;
nng_cv_wake(nano_wait_cv);
} else {
thread_required = nano_shared_aio != aiop->aio;
}
nng_mtx_unlock(nano_wait_mtx);

ncv->mtx = mtx;
ncv->cv = cv;
if (thread_required) {

if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread, taio)))
goto exitlevel3;
PROTECT(coreaio);
single_wait_thread_create(coreaio);
UNPROTECT(1);

SEXP xptr;
PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue));
R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE);
R_MakeWeakRef(coreaio, xptr, R_NilValue, TRUE);
UNPROTECT(2);
} else {

nng_time time = nng_clock();
nng_time time = nng_clock();

while (1) {
time = time + 400;
signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
while (1) {
time = time + 400;
signalled = 1;
nng_mtx_lock(nano_wait_mtx);
while (nano_wait_condition == 1) {
if (nng_cv_until(nano_wait_cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
}
}
nng_mtx_unlock(nano_wait_mtx);
if (signalled) break;
R_CheckUserInterrupt();
}
nng_mtx_unlock(mtx);
if (signalled) break;
R_CheckUserInterrupt();

}

switch (aiop->type) {
Expand All @@ -347,13 +433,9 @@ SEXP rnng_wait_thread_create(SEXP x) {

return x;

exitlevel3:
nng_cv_free(cv);
exitlevel2:
nng_mtx_free(mtx);
exitlevel1:
R_Free(ncv);
R_Free(taio);
fail:
if (nano_wait_cv) nng_cv_free(nano_wait_cv);
if (nano_wait_mtx) nng_mtx_free(nano_wait_mtx);
ERROR_OUT(xc);

} else if (typ == VECSXP) {
Expand All @@ -369,6 +451,22 @@ SEXP rnng_wait_thread_create(SEXP x) {

}

SEXP rnng_thread_shutdown(void) {
if (nano_wait_thr) {
if (nano_shared_aio != NULL)
nng_aio_stop(nano_shared_aio);
nng_mtx_lock(nano_wait_mtx);
nano_wait_condition = -1;
nng_cv_wake(nano_wait_cv);
nng_mtx_unlock(nano_wait_mtx);
nng_thread_destroy(nano_wait_thr);
nng_cv_free(nano_wait_cv);
nng_mtx_free(nano_wait_mtx);
nano_wait_thr = NULL;
}
return R_NilValue;
}

static void nano_record_pipe(nng_pipe p, nng_pipe_ev ev, void *arg) {

nano_signal *signal = (nano_signal *) arg;
Expand Down
1 change: 1 addition & 0 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ test_type("integer", .Call(nanonext:::rnng_traverse_precious))
if (Sys.info()[["sysname"]] == "Linux") {
rm(list = ls())
gc()
.Call(nanonext:::rnng_thread_shutdown)
Sys.sleep(1L)
.Call(nanonext:::rnng_fini)
invisible()
Expand Down

0 comments on commit c04c5d9

Please sign in to comment.