diff --git a/NAMESPACE b/NAMESPACE index cfaba60ed..dcc5281ae 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -48,6 +48,7 @@ export("opt<-") export(.advance) export(.context) export(.dispatcher) +export(.interrupt) export(.keep) export(.mark) export(.online) diff --git a/R/sync.R b/R/sync.R index 5c4097d45..559a49016 100644 --- a/R/sync.R +++ b/R/sync.R @@ -307,7 +307,8 @@ unlock <- function(socket) invisible(.Call(rnng_socket_unlock, socket)) #' Dispatcher Socket #' #' Creates a Dispatcher socket, which is a special type of \sQuote{req} socket, -#' with FIFO scheduling using a threaded implementation (for internal use only). +#' with FIFO scheduling using a threaded implementation. Internal package +#' function. #' #' @param host \sQuote{inproc://} url connecting the host to the thread. #' @param url the URLs at which to listen for rep nodes. @@ -323,8 +324,8 @@ unlock <- function(socket) invisible(.Call(rnng_socket_unlock, socket)) #' Read Online Status #' -#' Reads the online status of threaded dispatcher sockets (for internal use -#' only). +#' Reads the online status of threaded dispatcher sockets. Internal package +#' function. #' #' @param sock a dispatcher Socket. #' @@ -334,3 +335,19 @@ unlock <- function(socket) invisible(.Call(rnng_socket_unlock, socket)) #' @export #' .online <- function(sock) .Call(rnng_read_online, sock) + +#' Interrupt Switch +#' +#' Toggles on or off whether async receive completions trigger an interrupt. +#' Internal package function. +#' +#' @return NULL. +#' +#' @examples +#' .interrupt() +#' .interrupt() +#' +#' @keywords internal +#' @export +#' +.interrupt <- function() .Call(rnng_interrupt_switch) diff --git a/R/utils.R b/R/utils.R index 04e432b91..bd53db976 100644 --- a/R/utils.R +++ b/R/utils.R @@ -316,20 +316,18 @@ serial_config <- function(class, sfunc, ufunc, vec = FALSE) #' Set Serialization Marker #' -#' Internal package function. -#' -#' @param x logical value. +#' Toggles the serialization marker on or off. Internal package function. #' -#' @return The logical value 'x' supplied. +#' @return NULL. #' #' @examples #' .mark() -#' .mark(FALSE) +#' .mark() #' #' @keywords internal #' @export #' -.mark <- function(x = TRUE) .Call(rnng_set_marker, x) +.mark <- function() .Call(rnng_set_marker) #' Advances the RNG State #' diff --git a/man/dot-dispatcher.Rd b/man/dot-dispatcher.Rd index d08578b29..1d1da73fe 100644 --- a/man/dot-dispatcher.Rd +++ b/man/dot-dispatcher.Rd @@ -19,6 +19,7 @@ A \sQuote{req} Socket. The thread is attached as an attribute. } \description{ Creates a Dispatcher socket, which is a special type of \sQuote{req} socket, -with FIFO scheduling using a threaded implementation (for internal use only). +with FIFO scheduling using a threaded implementation. Internal package +function. } \keyword{internal} diff --git a/man/dot-interrupt.Rd b/man/dot-interrupt.Rd new file mode 100644 index 000000000..8b8dc120c --- /dev/null +++ b/man/dot-interrupt.Rd @@ -0,0 +1,21 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/sync.R +\name{.interrupt} +\alias{.interrupt} +\title{Interrupt Switch} +\usage{ +.interrupt() +} +\value{ +NULL. +} +\description{ +Toggles on or off whether async receive completions trigger an interrupt. +Internal package function. +} +\examples{ +.interrupt() +.interrupt() + +} +\keyword{internal} diff --git a/man/dot-mark.Rd b/man/dot-mark.Rd index 28392a614..f02e94b45 100644 --- a/man/dot-mark.Rd +++ b/man/dot-mark.Rd @@ -4,20 +4,17 @@ \alias{.mark} \title{Set Serialization Marker} \usage{ -.mark(x = TRUE) -} -\arguments{ -\item{x}{logical value.} +.mark() } \value{ -The logical value 'x' supplied. +NULL. } \description{ -Internal package function. +Toggles the serialization marker on or off. Internal package function. } \examples{ .mark() -.mark(FALSE) +.mark() } \keyword{internal} diff --git a/man/dot-online.Rd b/man/dot-online.Rd index 0b1239516..17970ca38 100644 --- a/man/dot-online.Rd +++ b/man/dot-online.Rd @@ -13,7 +13,7 @@ An vector of integer values. } \description{ -Reads the online status of threaded dispatcher sockets (for internal use -only). +Reads the online status of threaded dispatcher sockets. Internal package +function. } \keyword{internal} diff --git a/src/aio.c b/src/aio.c index 48f260f11..7a6fe1f1e 100644 --- a/src/aio.c +++ b/src/aio.c @@ -16,6 +16,7 @@ // nanonext - C level - Async Functions ---------------------------------------- +#define NANONEXT_SIGNALS #include "nanonext.h" // internals ------------------------------------------------------------------- @@ -65,6 +66,14 @@ static void raio_complete(void *arg) { if (raio->cb != NULL) later2(raio_invoke_cb, raio->cb); + if (nano_interrupt) { +#ifdef _WIN32 + raise(SIGINT); +#else + kill(getpid(), SIGINT); +#endif + } + } static void iraio_complete(void *arg) { diff --git a/src/init.c b/src/init.c index f8818d12d..d9d25acbe 100644 --- a/src/init.c +++ b/src/init.c @@ -20,6 +20,7 @@ void (*eln2)(void (*)(void *), void *, double, int) = NULL; +int nano_interrupt = 0; uint8_t special_bit = 0; nng_mtx *nano_wait_mtx; @@ -153,6 +154,7 @@ static const R_CallMethodDef callMethods[] = { {"rnng_eval_safe", (DL_FUNC) &rnng_eval_safe, 1}, {"rnng_fini", (DL_FUNC) &rnng_fini, 0}, {"rnng_get_opt", (DL_FUNC) &rnng_get_opt, 2}, + {"rnng_interrupt_switch", (DL_FUNC) &rnng_interrupt_switch, 0}, {"rnng_is_error_value", (DL_FUNC) &rnng_is_error_value, 1}, {"rnng_is_nul_byte", (DL_FUNC) &rnng_is_nul_byte, 1}, {"rnng_listen", (DL_FUNC) &rnng_listen, 5}, @@ -176,7 +178,7 @@ static const R_CallMethodDef callMethods[] = { {"rnng_send", (DL_FUNC) &rnng_send, 4}, {"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 5}, {"rnng_serial_config", (DL_FUNC) &rnng_serial_config, 4}, - {"rnng_set_marker", (DL_FUNC) &rnng_set_marker, 1}, + {"rnng_set_marker", (DL_FUNC) &rnng_set_marker, 0}, {"rnng_set_opt", (DL_FUNC) &rnng_set_opt, 3}, {"rnng_set_promise_context", (DL_FUNC) &rnng_set_promise_context, 2}, {"rnng_signal_thread_create", (DL_FUNC) &rnng_signal_thread_create, 2}, diff --git a/src/nanonext.h b/src/nanonext.h index 949341e65..8e3cf9183 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -229,6 +229,7 @@ typedef struct nano_buf_s { extern void (*eln2)(void (*)(void *), void *, double, int); extern uint8_t special_bit; +extern int nano_interrupt; extern SEXP nano_AioSymbol; extern SEXP nano_ContextSymbol; @@ -326,6 +327,7 @@ SEXP rnng_dispatcher_socket(SEXP, SEXP, SEXP); SEXP rnng_eval_safe(SEXP); SEXP rnng_fini(void); SEXP rnng_get_opt(SEXP, SEXP); +SEXP rnng_interrupt_switch(void); SEXP rnng_is_error_value(SEXP); SEXP rnng_is_nul_byte(SEXP); SEXP rnng_listen(SEXP, SEXP, SEXP, SEXP, SEXP); @@ -350,7 +352,7 @@ SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_send(SEXP, SEXP, SEXP, SEXP); SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_serial_config(SEXP, SEXP, SEXP, SEXP); -SEXP rnng_set_marker(SEXP); +SEXP rnng_set_marker(void); SEXP rnng_set_opt(SEXP, SEXP, SEXP); SEXP rnng_set_promise_context(SEXP, SEXP); SEXP rnng_signal_thread_create(SEXP, SEXP); diff --git a/src/sync.c b/src/sync.c index 629896499..c314f22de 100644 --- a/src/sync.c +++ b/src/sync.c @@ -623,3 +623,10 @@ SEXP rnng_socket_unlock(SEXP socket) { return nano_success; } + +SEXP rnng_interrupt_switch(void) { + + nano_interrupt = nano_interrupt ? 0 : 1; + return R_NilValue; + +} diff --git a/src/utils.c b/src/utils.c index 648d110c0..276a74d65 100644 --- a/src/utils.c +++ b/src/utils.c @@ -607,10 +607,10 @@ SEXP rnng_serial_config(SEXP klass, SEXP sfunc, SEXP ufunc, SEXP vec) { } -SEXP rnng_set_marker(SEXP x) { +SEXP rnng_set_marker(void) { - special_bit = (uint8_t) NANO_INTEGER(x); - return x; + special_bit = special_bit ? (uint8_t) 0 : (uint8_t) 1; + return R_NilValue; } diff --git a/tests/tests.R b/tests/tests.R index 5d3e5469a..4a482dd58 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -212,12 +212,12 @@ test_print(p <- tryCatch(collect_pipe(r), error = function(e) NULL)) if (!is.null(p)) test_class("nano", p) if (is_nano(p)) test_type("integer", p$id) if (is_nano(p)) test_type("integer", opt(p, "recv-fd")) -test_true(.mark()) +test_null(.mark()) test_class("sendAio", r <- send_aio(if (is_nano(p)) p else rep, "", timeout = 500)) if (later) test_null(.keep(r, new.env())) test_error(collect_pipe(r), "valid") test_equal(req$recv(mode = 8L, block = 500)[4L], 1L) -test_true(!.mark(FALSE)) +test_null(.mark()) test_class("nanoContext", ctx <- context(rep)) test_print(ctx) @@ -550,12 +550,14 @@ test_zero(dial(s, url = "inproc://disp/1")) test_true(wait(cv)) test_zero(send(disp, TRUE, block = 500L)) test_true(recv(s, block = 500L)) -test_true(.mark()) +test_null(.mark()) test_zero(send(s, NULL, block = 500L)) -test_true(!.mark(FALSE)) +test_null(.mark()) test_null(recv(disp, block = 500L)) test_zero(reap(s)) rm(disp) +test_null(.interrupt()) +test_null(.interrupt()) test_equal(nanonext:::.DollarNames.ncurlAio(NULL, "sta"), "status") test_equal(nanonext:::.DollarNames.recvAio(NULL, "dat"), "data")