Skip to content

Commit

Permalink
better msleep behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Jun 22, 2024
1 parent d757531 commit ac8e7b8
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 46 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* `request()` specifying argument 'cv' other than NULL or a 'conditionVariable' will cause the pipe connection to be dropped when the reply is (asynchronously) completed.
* Removes deprecated functions `strcat()`, `recv_aio_signal()` and `request_signal()`.
* Drops `base64enc()` and `base64dec()` in favour of those from the {secretbase} package.
* `msleep()` now ignores negative values rather than taking the absolute value.
* `later` is now relaxed to a soft 'suggests' dependency (only required if using promises).
* `promises` is added as a soft 'enhances' dependency.

Expand Down
6 changes: 3 additions & 3 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ mclock <- function() .Call(rnng_clock)
#'
#' @return Invisible NULL.
#'
#' @details Non-integer values for \sQuote{time} are coerced to integer, and the
#' absolute value is taken (the sign is ignored). Non-numeric values are
#' ignored, causing the function to return immediately.
#' @details Non-integer values for \sQuote{time} are coerced to integer.
#' Negative / NA / non-numeric values are ignored, causing the function to
#' return immediately.
#'
#' Note that unlike \code{\link{Sys.sleep}}, this function is not
#' user-interruptible by sending SIGINT e.g. with ctrl + c.
Expand Down
6 changes: 3 additions & 3 deletions man/msleep.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ SEXP rnng_ncurl_aio(SEXP http, SEXP convert, SEXP method, SEXP headers, SEXP dat
SEXP aio;

haio->type = HTTP_AIO;
haio->mode = *NANO_INTEGER(convert);
haio->mode = NANO_INTEGER(convert);
haio->next = handle;
haio->data = NULL;
handle->cfg = NULL;
Expand Down Expand Up @@ -1132,7 +1132,7 @@ SEXP rnng_ncurl_session(SEXP http, SEXP convert, SEXP method, SEXP headers, SEXP
SEXP sess, aio;

haio->type = HTTP_AIO;
haio->mode = *NANO_INTEGER(convert);
haio->mode = NANO_INTEGER(convert);
haio->next = handle;
haio->data = NULL;
handle->cfg = NULL;
Expand Down Expand Up @@ -1662,10 +1662,10 @@ SEXP rnng_pipe_notify(SEXP socket, SEXP cv, SEXP cv2, SEXP add, SEXP remove, SEX
if (cv == R_NilValue) {

sock = (nng_socket *) R_ExternalPtrAddr(socket);
if (*NANO_INTEGER(add) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, NULL, NULL)))
if (NANO_INTEGER(add) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, NULL, NULL)))
ERROR_OUT(xc);

if (*NANO_INTEGER(remove) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, NULL, NULL)))
if (NANO_INTEGER(remove) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, NULL, NULL)))
ERROR_OUT(xc);

return nano_success;
Expand All @@ -1676,7 +1676,7 @@ SEXP rnng_pipe_notify(SEXP socket, SEXP cv, SEXP cv2, SEXP add, SEXP remove, SEX

sock = (nng_socket *) R_ExternalPtrAddr(socket);
nano_cv *cvp = (nano_cv *) R_ExternalPtrAddr(cv);
const int flg = *NANO_INTEGER(flag);
const int flg = NANO_INTEGER(flag);

if (cv2 != R_NilValue) {

Expand All @@ -1688,10 +1688,10 @@ SEXP rnng_pipe_notify(SEXP socket, SEXP cv, SEXP cv2, SEXP add, SEXP remove, SEX
duo->cv = cvp;
duo->cv2 = (nano_cv *) R_ExternalPtrAddr(cv2);

if (*NANO_INTEGER(add) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, pipe_cb_signal_duo, duo)))
if (NANO_INTEGER(add) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, pipe_cb_signal_duo, duo)))
ERROR_OUT(xc);

if (*NANO_INTEGER(remove) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, pipe_cb_signal_duo, duo)))
if (NANO_INTEGER(remove) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, pipe_cb_signal_duo, duo)))
ERROR_OUT(xc);

SEXP xptr = R_MakeExternalPtr(duo, R_NilValue, R_NilValue);
Expand All @@ -1702,10 +1702,10 @@ SEXP rnng_pipe_notify(SEXP socket, SEXP cv, SEXP cv2, SEXP add, SEXP remove, SEX

cvp->flag = flg < 0 ? 1 : flg;

if (*NANO_INTEGER(add) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, pipe_cb_signal, cvp)))
if (NANO_INTEGER(add) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, pipe_cb_signal, cvp)))
ERROR_OUT(xc);

if (*NANO_INTEGER(remove) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, pipe_cb_signal, cvp)))
if (NANO_INTEGER(remove) && (xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, pipe_cb_signal, cvp)))
ERROR_OUT(xc);

}
Expand Down
38 changes: 19 additions & 19 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ SEXP rnng_dial(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
Rf_error("'tls' is not a valid TLS Configuration");

nng_socket *sock = (nng_socket *) R_ExternalPtrAddr(socket);
const int start = *NANO_INTEGER(autostart);
const int start = NANO_INTEGER(autostart);
const char *ur = CHAR(STRING_ELT(url, 0));
nano_dialer *dp = R_Calloc(1, nano_dialer);
SEXP dialer, attr, newattr;
Expand Down Expand Up @@ -768,7 +768,7 @@ SEXP rnng_dial(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
nng_tls_config_free(dp->tls);
exitlevel1:
R_Free(dp);
if (*NANO_INTEGER(error)) ERROR_OUT(xc);
if (NANO_INTEGER(error)) ERROR_OUT(xc);
ERROR_RET(xc);

}
Expand All @@ -784,7 +784,7 @@ SEXP rnng_listen(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
Rf_error("'tls' is not a valid TLS Configuration");

nng_socket *sock = (nng_socket *) R_ExternalPtrAddr(socket);
const int start = *NANO_INTEGER(autostart);
const int start = NANO_INTEGER(autostart);
const char *ur = CHAR(STRING_ELT(url, 0));
nano_listener *lp = R_Calloc(1, nano_listener);
SEXP listener, attr, newattr;
Expand Down Expand Up @@ -839,7 +839,7 @@ SEXP rnng_listen(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
nng_tls_config_free(lp->tls);
exitlevel1:
R_Free(lp);
if (*NANO_INTEGER(error)) ERROR_OUT(xc);
if (NANO_INTEGER(error)) ERROR_OUT(xc);
ERROR_RET(xc);

}
Expand All @@ -849,7 +849,7 @@ SEXP rnng_dialer_start(SEXP dialer, SEXP async) {
if (TAG(dialer) != nano_DialerSymbol)
Rf_error("'dialer' is not a valid Dialer");
nng_dialer *dial = (nng_dialer *) R_ExternalPtrAddr(dialer);
const int flags = (*NANO_INTEGER(async) == 1) * NNG_FLAG_NONBLOCK;
const int flags = (NANO_INTEGER(async) == 1) * NNG_FLAG_NONBLOCK;
const int xc = nng_dialer_start(*dial, flags);
if (xc)
ERROR_RET(xc);
Expand Down Expand Up @@ -923,7 +923,7 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) {

if (flags <= 0) {

xc = nng_send(*sock, buf.buf, buf.cur, flags ? NNG_FLAG_NONBLOCK : (*NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK);
xc = nng_send(*sock, buf.buf, buf.cur, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK);
NANO_FREE(buf);

} else {
Expand Down Expand Up @@ -971,7 +971,7 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) {
goto exitlevel1;

if ((xc = nng_msg_append(msgp, buf.buf, buf.cur)) ||
(xc = nng_ctx_sendmsg(*ctxp, msgp, flags ? NNG_FLAG_NONBLOCK : (*NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK))) {
(xc = nng_ctx_sendmsg(*ctxp, msgp, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK))) {
nng_msg_free(msgp);
goto exitlevel1;
}
Expand Down Expand Up @@ -1022,7 +1022,7 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) {
goto exitlevel1;
}

nng_aio_set_timeout(aiop, flags ? flags : (*NANO_INTEGER(block) != 0) * NNG_DURATION_DEFAULT);
nng_aio_set_timeout(aiop, flags ? flags : (NANO_INTEGER(block) != 0) * NNG_DURATION_DEFAULT);
nng_stream_send(sp, aiop);
NANO_FREE(buf);
nng_aio_wait(aiop);
Expand Down Expand Up @@ -1060,7 +1060,7 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {

if (flags <= 0) {

xc = nng_recv(*sock, &buf, &sz, NNG_FLAG_ALLOC + (flags < 0 || *NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK);
xc = nng_recv(*sock, &buf, &sz, NNG_FLAG_ALLOC + (flags < 0 || NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK);
if (xc)
goto exitlevel1;

Expand Down Expand Up @@ -1095,7 +1095,7 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {

if (flags <= 0) {

xc = nng_ctx_recvmsg(*ctxp, &msgp, (flags < 0 || *NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK);
xc = nng_ctx_recvmsg(*ctxp, &msgp, (flags < 0 || NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK);
if (xc)
goto exitlevel1;

Expand Down Expand Up @@ -1148,7 +1148,7 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
goto exitlevel2;
}

nng_aio_set_timeout(aiop, flags ? flags : (*NANO_INTEGER(block) != 0) * NNG_DURATION_DEFAULT);
nng_aio_set_timeout(aiop, flags ? flags : (NANO_INTEGER(block) != 0) * NNG_DURATION_DEFAULT);
nng_stream_recv(*sp, aiop);

nng_aio_wait(aiop);
Expand Down Expand Up @@ -1206,7 +1206,7 @@ SEXP rnng_set_opt(SEXP object, SEXP opt, SEXP value) {
xc = nng_socket_set_uint64(*sock, op, (uint64_t) val);
break;
case LGLSXP:
xc = nng_socket_set_bool(*sock, op, (bool) *NANO_INTEGER(value));
xc = nng_socket_set_bool(*sock, op, (bool) NANO_INTEGER(value));
break;
default:
Rf_error("type of 'value' not supported");
Expand Down Expand Up @@ -1234,7 +1234,7 @@ SEXP rnng_set_opt(SEXP object, SEXP opt, SEXP value) {
xc = nng_ctx_set_uint64(*ctx, op, (uint64_t) val);
break;
case LGLSXP:
xc = nng_ctx_set_bool(*ctx, op, (bool) *NANO_INTEGER(value));
xc = nng_ctx_set_bool(*ctx, op, (bool) NANO_INTEGER(value));
break;
default:
Rf_error("type of 'value' not supported");
Expand Down Expand Up @@ -1262,7 +1262,7 @@ SEXP rnng_set_opt(SEXP object, SEXP opt, SEXP value) {
xc = nng_stream_set_uint64(*st, op, (uint64_t) val);
break;
case LGLSXP:
xc = nng_stream_set_bool(*st, op, (bool) *NANO_INTEGER(value));
xc = nng_stream_set_bool(*st, op, (bool) NANO_INTEGER(value));
break;
default:
Rf_error("type of 'value' not supported");
Expand Down Expand Up @@ -1290,7 +1290,7 @@ SEXP rnng_set_opt(SEXP object, SEXP opt, SEXP value) {
xc = nng_listener_set_uint64(*list, op, (uint64_t) val);
break;
case LGLSXP:
xc = nng_listener_set_bool(*list, op, (bool) *NANO_INTEGER(value));
xc = nng_listener_set_bool(*list, op, (bool) NANO_INTEGER(value));
break;
default:
Rf_error("type of 'value' not supported");
Expand Down Expand Up @@ -1318,7 +1318,7 @@ SEXP rnng_set_opt(SEXP object, SEXP opt, SEXP value) {
xc = nng_dialer_set_uint64(*dial, op, (uint64_t) val);
break;
case LGLSXP:
xc = nng_dialer_set_bool(*dial, op, (bool) *NANO_INTEGER(value));
xc = nng_dialer_set_bool(*dial, op, (bool) NANO_INTEGER(value));
break;
default:
Rf_error("type of 'value' not supported");
Expand All @@ -1337,7 +1337,7 @@ SEXP rnng_set_opt(SEXP object, SEXP opt, SEXP value) {

SEXP rnng_subscribe(SEXP object, SEXP value, SEXP sub) {

const char *op = *NANO_INTEGER(sub) ? "sub:subscribe" : "sub:unsubscribe";
const char *op = NANO_INTEGER(sub) ? "sub:subscribe" : "sub:unsubscribe";
nano_buf buf;
int xc;

Expand Down Expand Up @@ -1547,7 +1547,7 @@ SEXP rnng_next_config(SEXP refhook, SEXP klass, SEXP list, SEXP mark) {
if (TYPEOF(klass) != STRSXP)
Rf_error("'class' must be a character string");

special_bit = (uint8_t) *NANO_INTEGER(mark);
special_bit = (uint8_t) NANO_INTEGER(mark);
SEXPTYPE typ1, typ2;
int plist;

Expand Down Expand Up @@ -1581,7 +1581,7 @@ SEXP rnng_next_config(SEXP refhook, SEXP klass, SEXP list, SEXP mark) {
SETCADR(nano_refHook, plist ? CADR(refhook) : R_VECTOR(refhook)[1]);
SETCAR(nano_klassString, STRING_ELT(klass, 0));

registered = *NANO_INTEGER(list) ? 1 : 2;
registered = NANO_INTEGER(list) ? 1 : 2;

}

Expand Down
2 changes: 1 addition & 1 deletion src/keycert.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ SEXP rnng_write_cert(SEXP cn, SEXP valid, SEXP inter) {

const char *common = CHAR(STRING_ELT(cn, 0));
const char *not_after = CHAR(STRING_ELT(valid, 0)); /* validity period not after */
const int interactive = *NANO_INTEGER(inter);
const int interactive = NANO_INTEGER(inter);
mbedtls_entropy_context entropy;
mbedtls_ctr_drbg_context ctr_drbg;
mbedtls_pk_context key;
Expand Down
2 changes: 1 addition & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ typedef struct nano_thread_duo_s {
(x)->len = 0; \
(x)->cur = sz
#define NANO_FREE(x) if (x.len) R_Free(x.buf)
#define NANO_INTEGER(x) (int *) DATAPTR_RO(x)
#define NANO_INTEGER(x) *(int *) DATAPTR_RO(x)
#define NANO_ERROR(x) { Rf_error(x); return R_NilValue; }
#define NANO_CLASS2(x, cls1, cls2) \
SEXP klass = Rf_allocVector(STRSXP, 2); \
Expand Down
2 changes: 1 addition & 1 deletion src/protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void socket_finalizer(SEXP xptr) {
SEXP rnng_protocol_open(SEXP protocol, SEXP raw) {

const char *pro = CHAR(STRING_ELT(protocol, 0));
const int rw = *NANO_INTEGER(raw);
const int rw = NANO_INTEGER(raw);
size_t slen = strlen(pro);

const char *pname;
Expand Down
4 changes: 2 additions & 2 deletions src/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ SEXP rnng_random(SEXP n, SEXP convert) {
switch (TYPEOF(n)) {
case INTSXP:
case LGLSXP:
sz = INTEGER(n)[0];
sz = NANO_INTEGER(n);
if (sz >= 0 && sz <= 1024) break;
case REALSXP:
sz = Rf_asInteger(n);
Expand Down Expand Up @@ -92,7 +92,7 @@ SEXP rnng_random(SEXP n, SEXP convert) {
if (xc)
Rf_error("error generating random bytes");

if (*NANO_INTEGER(convert)) {
if (NANO_INTEGER(convert)) {
out = nano_hash_char(buf, sz);
} else {
out = Rf_allocVector(RAWSXP, sz);
Expand Down
18 changes: 11 additions & 7 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,16 @@ SEXP rnng_clock(void) {

SEXP rnng_sleep(SEXP msec) {

int time;
switch (TYPEOF(msec)) {
case INTSXP:
nng_msleep((nng_duration) abs(INTEGER(msec)[0]));
case LGLSXP:
time = NANO_INTEGER(msec);
if (time > 0) nng_msleep((nng_duration) time);
break;
case REALSXP:
nng_msleep((nng_duration) abs(Rf_asInteger(msec)));
time = Rf_asInteger(msec);
if (time > 0) nng_msleep((nng_duration) time);
break;
}

Expand Down Expand Up @@ -241,7 +245,7 @@ SEXP rnng_ncurl(SEXP http, SEXP convert, SEXP follow, SEXP method, SEXP headers,

code = nng_http_res_get_status(res), relo = code >= 300 && code < 400;

if (relo && *NANO_INTEGER(follow)) {
if (relo && NANO_INTEGER(follow)) {
const char *location = nng_http_res_get_header(res, "Location");
if (location == NULL) goto resume;
nng_url *oldurl = url;
Expand Down Expand Up @@ -294,7 +298,7 @@ SEXP rnng_ncurl(SEXP http, SEXP convert, SEXP follow, SEXP method, SEXP headers,

nng_http_res_get_data(res, &dat, &sz);

if (*NANO_INTEGER(convert)) {
if (NANO_INTEGER(convert)) {
vec = rawToChar(dat, sz);
} else {
vec = Rf_allocVector(RAWSXP, sz);
Expand Down Expand Up @@ -338,7 +342,7 @@ SEXP rnng_stream_dial(SEXP url, SEXP textframes, SEXP tls) {
Rf_error("'tls' is not a valid TLS Configuration");
nano_stream *nst = R_Calloc(1, nano_stream);
nst->mode = NANO_STREAM_DIALER;
nst->textframes = *NANO_INTEGER(textframes) != 0;
nst->textframes = NANO_INTEGER(textframes) != 0;
nst->tls = NULL;
nng_url *up;
nng_aio *aiop;
Expand Down Expand Up @@ -427,7 +431,7 @@ SEXP rnng_stream_listen(SEXP url, SEXP textframes, SEXP tls) {
Rf_error("'tls' is not a valid TLS Configuration");
nano_stream *nst = R_Calloc(1, nano_stream);
nst->mode = NANO_STREAM_LISTENER;
nst->textframes = *NANO_INTEGER(textframes) != 0;
nst->textframes = NANO_INTEGER(textframes) != 0;
nst->tls = NULL;
nng_url *up;
nng_aio *aiop;
Expand Down Expand Up @@ -608,7 +612,7 @@ SEXP rnng_status_code(SEXP x) {

SEXP rnng_tls_config(SEXP client, SEXP server, SEXP pass, SEXP auth) {

const nng_tls_auth_mode mod = *NANO_INTEGER(auth) ? NNG_TLS_AUTH_MODE_REQUIRED : NNG_TLS_AUTH_MODE_OPTIONAL;
const nng_tls_auth_mode mod = NANO_INTEGER(auth) ? NNG_TLS_AUTH_MODE_REQUIRED : NNG_TLS_AUTH_MODE_OPTIONAL;
R_xlen_t usefile;
nng_tls_config *cfg;
int xc;
Expand Down

0 comments on commit ac8e7b8

Please sign in to comment.