Skip to content

Commit

Permalink
adds .interrupt() internal package function for whether to interrupt …
Browse files Browse the repository at this point in the history
…upon async recv completions; simplifies .mark()
  • Loading branch information
shikokuchuo committed Nov 27, 2024
1 parent a069558 commit fa253f5
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 28 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export("opt<-")
export(.advance)
export(.context)
export(.dispatcher)
export(.interrupt)
export(.keep)
export(.mark)
export(.online)
Expand Down
23 changes: 20 additions & 3 deletions R/sync.R
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ unlock <- function(socket) invisible(.Call(rnng_socket_unlock, socket))
#' Dispatcher Socket
#'
#' Creates a Dispatcher socket, which is a special type of \sQuote{req} socket,
#' with FIFO scheduling using a threaded implementation (for internal use only).
#' with FIFO scheduling using a threaded implementation. Internal package
#' function.
#'
#' @param host \sQuote{inproc://} url connecting the host to the thread.
#' @param url the URLs at which to listen for rep nodes.
Expand All @@ -323,8 +324,8 @@ unlock <- function(socket) invisible(.Call(rnng_socket_unlock, socket))

#' Read Online Status
#'
#' Reads the online status of threaded dispatcher sockets (for internal use
#' only).
#' Reads the online status of threaded dispatcher sockets. Internal package
#' function.
#'
#' @param sock a dispatcher Socket.
#'
Expand All @@ -334,3 +335,19 @@ unlock <- function(socket) invisible(.Call(rnng_socket_unlock, socket))
#' @export
#'
.online <- function(sock) .Call(rnng_read_online, sock)

#' Interrupt Switch
#'
#' Toggles on or off whether async receive completions trigger an interrupt.
#' Internal package function.
#'
#' @return NULL.
#'
#' @examples
#' .interrupt()
#' .interrupt()
#'
#' @keywords internal
#' @export
#'
.interrupt <- function() .Call(rnng_interrupt_switch)
10 changes: 4 additions & 6 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -316,20 +316,18 @@ serial_config <- function(class, sfunc, ufunc, vec = FALSE)

#' Set Serialization Marker
#'
#' Internal package function.
#'
#' @param x logical value.
#' Toggles the serialization marker on or off. Internal package function.
#'
#' @return The logical value 'x' supplied.
#' @return NULL.
#'
#' @examples
#' .mark()
#' .mark(FALSE)
#' .mark()
#'
#' @keywords internal
#' @export
#'
.mark <- function(x = TRUE) .Call(rnng_set_marker, x)
.mark <- function() .Call(rnng_set_marker)

#' Advances the RNG State
#'
Expand Down
3 changes: 2 additions & 1 deletion man/dot-dispatcher.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions man/dot-interrupt.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 4 additions & 7 deletions man/dot-mark.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions man/dot-online.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

// nanonext - C level - Async Functions ----------------------------------------

#define NANONEXT_SIGNALS
#include "nanonext.h"

// internals -------------------------------------------------------------------
Expand Down Expand Up @@ -65,6 +66,14 @@ static void raio_complete(void *arg) {
if (raio->cb != NULL)
later2(raio_invoke_cb, raio->cb);

if (nano_interrupt) {
#ifdef _WIN32
raise(SIGINT);
#else
kill(getpid(), SIGINT);
#endif
}

}

static void iraio_complete(void *arg) {
Expand Down
4 changes: 3 additions & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

void (*eln2)(void (*)(void *), void *, double, int) = NULL;

int nano_interrupt = 0;
uint8_t special_bit = 0;

nng_mtx *nano_wait_mtx;
Expand Down Expand Up @@ -153,6 +154,7 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_eval_safe", (DL_FUNC) &rnng_eval_safe, 1},
{"rnng_fini", (DL_FUNC) &rnng_fini, 0},
{"rnng_get_opt", (DL_FUNC) &rnng_get_opt, 2},
{"rnng_interrupt_switch", (DL_FUNC) &rnng_interrupt_switch, 0},
{"rnng_is_error_value", (DL_FUNC) &rnng_is_error_value, 1},
{"rnng_is_nul_byte", (DL_FUNC) &rnng_is_nul_byte, 1},
{"rnng_listen", (DL_FUNC) &rnng_listen, 5},
Expand All @@ -176,7 +178,7 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_send", (DL_FUNC) &rnng_send, 4},
{"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 5},
{"rnng_serial_config", (DL_FUNC) &rnng_serial_config, 4},
{"rnng_set_marker", (DL_FUNC) &rnng_set_marker, 1},
{"rnng_set_marker", (DL_FUNC) &rnng_set_marker, 0},
{"rnng_set_opt", (DL_FUNC) &rnng_set_opt, 3},
{"rnng_set_promise_context", (DL_FUNC) &rnng_set_promise_context, 2},
{"rnng_signal_thread_create", (DL_FUNC) &rnng_signal_thread_create, 2},
Expand Down
4 changes: 3 additions & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ typedef struct nano_buf_s {

extern void (*eln2)(void (*)(void *), void *, double, int);
extern uint8_t special_bit;
extern int nano_interrupt;

extern SEXP nano_AioSymbol;
extern SEXP nano_ContextSymbol;
Expand Down Expand Up @@ -326,6 +327,7 @@ SEXP rnng_dispatcher_socket(SEXP, SEXP, SEXP);
SEXP rnng_eval_safe(SEXP);
SEXP rnng_fini(void);
SEXP rnng_get_opt(SEXP, SEXP);
SEXP rnng_interrupt_switch(void);
SEXP rnng_is_error_value(SEXP);
SEXP rnng_is_nul_byte(SEXP);
SEXP rnng_listen(SEXP, SEXP, SEXP, SEXP, SEXP);
Expand All @@ -350,7 +352,7 @@ SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_send(SEXP, SEXP, SEXP, SEXP);
SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_serial_config(SEXP, SEXP, SEXP, SEXP);
SEXP rnng_set_marker(SEXP);
SEXP rnng_set_marker(void);
SEXP rnng_set_opt(SEXP, SEXP, SEXP);
SEXP rnng_set_promise_context(SEXP, SEXP);
SEXP rnng_signal_thread_create(SEXP, SEXP);
Expand Down
7 changes: 7 additions & 0 deletions src/sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -623,3 +623,10 @@ SEXP rnng_socket_unlock(SEXP socket) {
return nano_success;

}

SEXP rnng_interrupt_switch(void) {

nano_interrupt = nano_interrupt ? 0 : 1;
return R_NilValue;

}
6 changes: 3 additions & 3 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,10 @@ SEXP rnng_serial_config(SEXP klass, SEXP sfunc, SEXP ufunc, SEXP vec) {

}

SEXP rnng_set_marker(SEXP x) {
SEXP rnng_set_marker(void) {

special_bit = (uint8_t) NANO_INTEGER(x);
return x;
special_bit = special_bit ? (uint8_t) 0 : (uint8_t) 1;
return R_NilValue;

}

Expand Down
10 changes: 6 additions & 4 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,12 @@ test_print(p <- tryCatch(collect_pipe(r), error = function(e) NULL))
if (!is.null(p)) test_class("nano", p)
if (is_nano(p)) test_type("integer", p$id)
if (is_nano(p)) test_type("integer", opt(p, "recv-fd"))
test_true(.mark())
test_null(.mark())
test_class("sendAio", r <- send_aio(if (is_nano(p)) p else rep, "", timeout = 500))
if (later) test_null(.keep(r, new.env()))
test_error(collect_pipe(r), "valid")
test_equal(req$recv(mode = 8L, block = 500)[4L], 1L)
test_true(!.mark(FALSE))
test_null(.mark())

test_class("nanoContext", ctx <- context(rep))
test_print(ctx)
Expand Down Expand Up @@ -550,12 +550,14 @@ test_zero(dial(s, url = "inproc://disp/1"))
test_true(wait(cv))
test_zero(send(disp, TRUE, block = 500L))
test_true(recv(s, block = 500L))
test_true(.mark())
test_null(.mark())
test_zero(send(s, NULL, block = 500L))
test_true(!.mark(FALSE))
test_null(.mark())
test_null(recv(disp, block = 500L))
test_zero(reap(s))
rm(disp)
test_null(.interrupt())
test_null(.interrupt())

test_equal(nanonext:::.DollarNames.ncurlAio(NULL, "sta"), "status")
test_equal(nanonext:::.DollarNames.recvAio(NULL, "dat"), "data")
Expand Down

0 comments on commit fa253f5

Please sign in to comment.