Skip to content

Commit

Permalink
reverts ack
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Oct 31, 2023
1 parent 930e5ce commit 4711f91
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 72 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 0.10.2.9022
Version: 0.10.2.9023
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# nanonext 0.10.2.9022 (development)
# nanonext 0.10.2.9023 (development)

#### New Features

Expand All @@ -10,7 +10,7 @@
* `lock()` supplying 'cv' has improved behaviour which locks the socket whilst allowing for both initial connections and re-connections (when the 'cv' is registered for both add and remove pipe events).
* Improves listener / dialer logic for TLS connections, allowing *inter alia* synchronous dials.
* `pipe_notify()` arguments 'add', 'remove' and 'flag' now default to FALSE instead of TRUE for easier selective specification of the events to signal.
* `request()` argument 'ack' behaviour is more robust.
* `request()` argument 'ack' removed due to stability considerations.
* Fixes memory leaks detected with valgrind.
* Upgrades bundled 'libmbedtls' to v 3.5.0.

Expand Down
8 changes: 2 additions & 6 deletions R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,6 @@ reply <- function(context,
#' @param data an object (if send_mode = 'raw', a vector).
#' @param timeout [default NULL] integer value in milliseconds or NULL, which
#' applies a socket-specific default, usually the same as no timeout.
#' @param ack [default FALSE] logical value whether to send an ack(nowledgement)
#' back to the rep node (consisting of an empty message) when the async
#' receive is complete.
#'
#' @return A 'recvAio' (object of class 'recvAio') (invisibly).
#'
Expand Down Expand Up @@ -230,9 +227,8 @@ request <- function(context,
send_mode = c("serial", "raw", "next"),
recv_mode = c("serial", "character", "complex", "double",
"integer", "logical", "numeric", "raw", "string"),
timeout = NULL,
ack = FALSE)
data <- .Call(rnng_request, context, data, send_mode, recv_mode, timeout, ack, environment())
timeout = NULL)
data <- .Call(rnng_request, context, data, send_mode, recv_mode, timeout, environment())

#' Request over Context and Signal a Condition Variable
#'
Expand Down
7 changes: 1 addition & 6 deletions man/request.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 2 additions & 53 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,49 +244,6 @@ static void raio_complete_signal(void *arg) {

}

static void raio_complete_ack(void *arg) {

nano_aio *raio = (nano_aio *) arg;
nng_ctx *ctx = (nng_ctx *) raio->data;
const int res = nng_aio_result(raio->aio);
if (res == 0)
raio->data = nng_aio_get_msg(raio->aio);

#if NNG_MAJOR_VERSION == 1 && NNG_MINOR_VERSION < 6

nng_mtx_lock(shr_mtx);
raio->result = res - !res;
nng_mtx_unlock(shr_mtx);

nng_msg *msg;
if (nng_msg_alloc(&msg, 0) == 0) {
nng_aio *aio;
if (nng_aio_alloc(&aio, NULL, NULL) == 0) {
nng_aio_set_msg(aio, msg);
nng_aio_set_timeout(aio, (nng_duration) NANONEXT_ACK_MS);
nng_ctx_send(*ctx, aio);
nng_aio_wait(aio);
if (nng_aio_result(aio))
nng_msg_free(nng_aio_get_msg(aio));
nng_aio_free(aio);
} else {
nng_msg_free(msg);
}
}

#else

raio->result = res - !res;
nng_msg *msg;
if (nng_msg_alloc(&msg, 0) == 0) {
if (nng_ctx_sendmsg(*ctx, msg, NNG_FLAG_NONBLOCK))
nng_msg_free(msg);
}

#endif

}

static void iraio_complete(void *arg) {

nano_aio *iaio = (nano_aio *) arg;
Expand Down Expand Up @@ -1604,15 +1561,14 @@ SEXP rnng_ncurl_session_close(SEXP session) {

// request ---------------------------------------------------------------------

SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeout, SEXP ack, SEXP clo) {
SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeout, SEXP clo) {

if (R_ExternalPtrTag(con) != nano_ContextSymbol)
Rf_error("'context' is not a valid Context");

nng_ctx *ctx = (nng_ctx *) R_ExternalPtrAddr(con);

int xc;
const int ac = LOGICAL(ack)[0];
const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout);

SEXP sendaio, aio, env, fun;
Expand Down Expand Up @@ -1653,14 +1609,7 @@ SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeou
raio->type = RECVAIO;
raio->mode = nano_matcharg(recvmode);

if (ac) {
raio->data = ctx;
xc = nng_aio_alloc(&raio->aio, raio_complete_ack, raio);
} else {
xc = nng_aio_alloc(&raio->aio, raio_complete, raio);
}

if (xc) {
if ((xc = nng_aio_alloc(&raio->aio, raio_complete, raio))) {
R_Free(raio);
nng_aio_free(saio->aio);
R_Free(saio);
Expand Down
2 changes: 1 addition & 1 deletion src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_reap", (DL_FUNC) &rnng_reap, 1},
{"rnng_recv", (DL_FUNC) &rnng_recv, 4},
{"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 5},
{"rnng_request", (DL_FUNC) &rnng_request, 7},
{"rnng_request", (DL_FUNC) &rnng_request, 6},
{"rnng_send", (DL_FUNC) &rnng_send, 4},
{"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 5},
{"rnng_set_opt", (DL_FUNC) &rnng_set_opt, 3},
Expand Down
2 changes: 1 addition & 1 deletion src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ extern SEXP rnng_random(SEXP, SEXP);
extern SEXP rnng_reap(SEXP);
extern SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP);
extern SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP rnng_send(SEXP, SEXP, SEXP, SEXP);
extern SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP rnng_set_opt(SEXP, SEXP, SEXP);
Expand Down
3 changes: 1 addition & 2 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,9 @@ nanotest(call_aio(cr)$data == "test")
nanotest(is.integer(send(ctxn, TRUE, mode = 0L, block = FALSE)))
nanotest(is.integer(recv(ctxn, mode = 8L, block = FALSE)))
nanotest(typeof(ctxn <- .context(rep)) == "externalptr")
nanotestaio(cs <- request(.context(req$socket), data = TRUE, ack = TRUE))
nanotestaio(cs <- request(.context(req$socket), data = TRUE))
nanotest(recv(ctxn, block = 500))
nanotestz(send(ctxn, TRUE, mode = 3L, block = 500))
nanotest(is.raw(recv(ctxn, mode = 8L, block = 500)))
nanotestz(reap(ctxn))
nanotestz(pipe_notify(rep, cv, add = TRUE, remove = TRUE, flag = TRUE))
nanotestz(pipe_notify(req$socket, cv = cv, add = FALSE, remove = TRUE, flag = FALSE))
Expand Down

0 comments on commit 4711f91

Please sign in to comment.