Skip to content

Commit

Permalink
implements until_ and fixes other variants
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 22, 2023
1 parent ef17e4e commit f590ced
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 15 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 0.10.4.9008
Version: 0.10.4.9009
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export(unlock)
export(unresolved)
export(unsubscribe)
export(until)
export(until_)
export(wait)
export(wait_)
export(write_cert)
Expand Down
6 changes: 3 additions & 3 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# nanonext 0.10.4.9008 (development)
# nanonext 0.10.4.9009 (development)

#### New Features

* Introduces `call_aio_()`, a safe user-interruptible version of `call_aio()` suitable for interactive use.
* Introduces `wait_()`, a safe user-interruptible version of `wait()` suitable for interactive use.
* Introduces `call_aio_()`, a user-interruptible version of `call_aio()` suitable for interactive use.
* Introduces `wait_()` and `until_()` user-interruptible versions of `wait()` and `until()` suitable for interactive use.
* Implements `%~>%` signal forwarder from one 'conditionVariable' to another.

#### Updates
Expand Down
4 changes: 2 additions & 2 deletions R/aio.R
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ call_aio <- function(aio) invisible(.Call(rnng_aio_call, aio))

#' Call the Value of an Asynchronous Aio Operation
#'
#' \code{call_aio_} is identical to \code{call_aio} but allows user
#' interrupts, thus being safe for interactive use.
#' \code{call_aio_} is a variant that allows user interrupts, suitable for
#' interactive use.
#'
#' @rdname call_aio
#' @export
Expand Down
9 changes: 8 additions & 1 deletion R/sync.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,26 @@ wait_ <- function(cv) invisible(.Call(rnng_cv_wait_safe, cv))
#' Condition Variables - Until
#'
#' \code{until} waits until a future time on a condition being signalled by
#' completion of an asynchronous receive or pipe event.
#' completion of an asynchronous receive or pipe event. \cr \code{until_} is
#' a variant that allows user interrupts, suitable for interactive use.
#'
#' @param msec maximum time in milliseconds to wait for the condition variable
#' to be signalled.
#'
#' @examples
#' until(cv, 10L)
#' until_(cv, 10L)
#'
#' @rdname cv
#' @export
#'
until <- function(cv, msec) invisible(.Call(rnng_cv_until, cv, msec))

#' @rdname cv
#' @export
#'
until_ <- function(cv, msec) invisible(.Call(rnng_cv_until_safe, cv, msec))

#' Condition Variables - Value
#'
#' \code{cv_value} inspects the internal value of a condition variable.
Expand Down
4 changes: 2 additions & 2 deletions man/call_aio.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion man/cv.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 53 additions & 4 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,7 @@ SEXP rnng_cv_until(SEXP cvar, SEXP msec) {
nng_cv *cv = ncv->cv;
nng_mtx *mtx = ncv->mtx;

int signalled = 1;
nng_time time = nng_clock();
switch (TYPEOF(msec)) {
case INTSXP:
Expand All @@ -1708,7 +1709,6 @@ SEXP rnng_cv_until(SEXP cvar, SEXP msec) {
break;
}

int signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
Expand Down Expand Up @@ -1738,20 +1738,22 @@ SEXP rnng_cv_wait_safe(SEXP cvar) {
nng_mtx *mtx = ncv->mtx;
int signalled;
uint8_t flag;
nng_time time = nng_clock();

do {
while (1) {
time = time + 400;
signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, 2000) == NNG_ETIMEDOUT) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
}
}
if (signalled) break;
nng_mtx_unlock(mtx);
R_CheckUserInterrupt();
} while (1);
}

ncv->condition--;
flag = ncv->flag;
Expand All @@ -1761,6 +1763,53 @@ SEXP rnng_cv_wait_safe(SEXP cvar) {

}

SEXP rnng_cv_until_safe(SEXP cvar, SEXP msec) {

if (R_ExternalPtrTag(cvar) != nano_CvSymbol)
Rf_error("'cv' is not a valid Condition Variable");

nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar);
nng_cv *cv = ncv->cv;
nng_mtx *mtx = ncv->mtx;
int signalled;

nng_time time, period, now;
switch (TYPEOF(msec)) {
case INTSXP:
period = (nng_time) INTEGER(msec)[0];
break;
case REALSXP:
period = (nng_time) Rf_asInteger(msec);
break;
}

now = nng_clock();

do {
time = period > 400 ? now + 400 : now + period;
period = period > 400 ? period - 400 : 0;
signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
}
}
if (signalled) {
ncv->condition--;
nng_mtx_unlock(mtx);
break;
}
nng_mtx_unlock(mtx);
R_CheckUserInterrupt();
now += 400;
} while (period > 0);

return Rf_ScalarLogical(signalled);

}

SEXP rnng_cv_reset(SEXP cvar) {

if (R_ExternalPtrTag(cvar) != nano_CvSymbol)
Expand Down
1 change: 1 addition & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_cv_reset", (DL_FUNC) &rnng_cv_reset, 1},
{"rnng_cv_signal", (DL_FUNC) &rnng_cv_signal, 1},
{"rnng_cv_until", (DL_FUNC) &rnng_cv_until, 2},
{"rnng_cv_until_safe", (DL_FUNC) &rnng_cv_until_safe, 2},
{"rnng_cv_value", (DL_FUNC) &rnng_cv_value, 1},
{"rnng_cv_wait", (DL_FUNC) &rnng_cv_wait, 1},
{"rnng_cv_wait_safe", (DL_FUNC) &rnng_cv_wait_safe, 1},
Expand Down
1 change: 1 addition & 0 deletions src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ extern SEXP rnng_cv_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP rnng_cv_reset(SEXP);
extern SEXP rnng_cv_signal(SEXP);
extern SEXP rnng_cv_until(SEXP, SEXP);
extern SEXP rnng_cv_until_safe(SEXP, SEXP);
extern SEXP rnng_cv_value(SEXP);
extern SEXP rnng_cv_wait(SEXP);
extern SEXP rnng_cv_wait_safe(SEXP);
Expand Down
5 changes: 4 additions & 1 deletion src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,14 @@ SEXP rnng_wait_thread_create(SEXP aio) {
R_MakeWeakRef(coreaio, xptr, R_NilValue, FALSE);
UNPROTECT(2);

nng_time time = nng_clock();

while (1) {
time = time + 400;
signalled = 1;
nng_mtx_lock(mtx);
while (ncv->condition == 0) {
if (nng_cv_until(cv, 2000) == NNG_ETIMEDOUT) {
if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) {
signalled = 0;
break;
}
Expand Down
3 changes: 3 additions & 0 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ nanotest(inherits(cv <- cv(), "conditionVariable"))
nanotestp(cv)
nanotest(!until(cv, 10L))
nanotest(!until(cv, 10))
nanotest(!until_(cv, 10L))
nanotest(!until_(cv, 10))
nanotestz(cv_reset(cv))
nanotestz(cv_value(cv))
nanotestaio(cs <- request_signal(req$context, "test", send_mode = "next", cv = cv, timeout = 500))
Expand All @@ -249,6 +251,7 @@ nanotesterr(recv_aio_signal(rep, "test", cv = err), "valid")
nanotesterr(wait(err), "valid")
nanotesterr(wait_(err), "valid")
nanotesterr(until(err, 10), "valid")
nanotesterr(until_(err, 10), "valid")
nanotesterr(cv_value(err), "valid")
nanotesterr(cv_reset(err), "valid")
nanotesterr(cv_signal(err), "valid")
Expand Down

0 comments on commit f590ced

Please sign in to comment.