Skip to content

Commit

Permalink
Merge pull request #168 from r-lib/fix/large-session-msgs
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborcsardi authored Oct 13, 2020
2 parents 14ddf5f + 0aaa42e commit ced20a6
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 14 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@

# callr development version

* `callr::r_session` now handles large messages from the subprocess
well (#168).

# callr 3.5.0

* callr can now pass the environment of the function to the subprocess,
Expand Down
112 changes: 99 additions & 13 deletions R/r-session.R
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ r_session <- R6::R6Class(
func_file = NULL,
res_file = NULL,

buffer = NULL,
read_buffer = function()
rs__read_buffer(self, private),
read_message = function()
rs__read_message(self, private),

get_result_and_output = function(std = FALSE)
rs__get_result_and_output(self, private, std),
report_back = function(code, text = "")
Expand Down Expand Up @@ -305,22 +311,98 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) {
}

rs_read <- function(self, private) {
out <- processx::processx_conn_read_lines(private$pipe, 1)
if (!is.null(private$buffer)) {
# There is a partial message in the buffer, try to finish it.
out <- private$read_buffer()
} else {
# A new message.
out <- private$read_message()
}
if (!length(out)) {
if (processx::processx_conn_is_incomplete(private$pipe)) return()
if (self$is_alive()) {
# We do this in on.exit(), because parse_msg still reads the streams
on.exit(self$kill(), add = TRUE)
out <- "502 R session closed the process connection, killed"
out <- list(header = list(
code = 502, length = 0,
rest = "R session closed the process connection, killed"
))
} else if (identical(es <- self$get_exit_status(), 0L)) {
out <- "500 R session finished cleanly"
out <- list(header = list(
code = 500, length = 0,
rest = "R session finished cleanly"
))
} else {
out <- paste0("501 R session crashed with exit code ", es)
out <- list(header = list(
code = 501, length = 0,
rest = paste0("R session crashed with exit code ", es)
))
}
}
if (length(out)) private$parse_msg(out)
}

rs__read_buffer <- function(self, private) {
# There is a partial message in the buffer already, we need to
# read some more
need <- private$buffer$header$length - private$buffer$got
chunk <- processx::processx_conn_read_chars(private$pipe, need)
got <- nchar(chunk)
if (got == 0) {
# make this special case fast
NULL
} else if (got == need) {
msg <- list(
header = private$buffer$header,
body = paste(c(private$buffer$chunks, list(chunk)), collapse = "")
)
private$buffer <- NULL
msg
} else {
private$buffer$got <- private$buffer$got + got
private$buffer$chunks <- c(private$buffer$chunks, list(chunk))
NULL
}
}

rs__read_message <- function(self, private) {
# A new message, we can surely read the first line
out <- processx::processx_conn_read_lines(private$pipe, 1)
if (length(out) == 0) return(NULL)

header <- rs__parse_header(out)
body <- ""
if (header$length > 0) {
body <- processx::processx_conn_read_chars(
private$pipe,
header$length
)
}
got <- nchar(body)
if (got < header$length) {
# Partial message
private$buffer <- list(
header = header,
got = got,
chunks = list(body)
)
NULL
} else {
list(header = header, body = body)
}
}

rs__parse_header <- function(line) {
parts <- strsplit(line, " ", fixed = TRUE)[[1]]
parts2 <- suppressWarnings(as.integer(parts[1:2]))
rest <- paste(parts[-(1:2)], collapse = " ")
header <- list(code = parts2[1], length = parts2[2], rest = rest)
if (is.na(header$code) || is.na(header$length)) {
stop("Internal callr error, invalid message header")
}
header
}

rs_close <- function(self, private, grace) {
processx::processx_conn_close(self$get_input_connection())
self$poll_process(grace)
Expand Down Expand Up @@ -571,7 +653,10 @@ rs__attach_wait <- function(self, private) {
}

rs__report_back <- function(self, private, code, text) {
cmd <- paste0(deparse(rs__status_expr(code, text, fd = 3)), "\n")
cmd <- paste0(
deparse(rs__status_expr(code, text, fd = 3)),
"\n"
)
private$write_for_sure(cmd)
}

Expand All @@ -584,19 +669,20 @@ rs__write_for_sure <- function(self, private, text) {
}

rs__parse_msg <- function(self, private, msg) {
s <- strsplit(msg, " ", fixed = TRUE)[[1]]
code <- as.integer(s[1])
message <- paste(s[-1], collapse = " ")
if (substr(message, 1, 8) == "base64::") {
code <- as.character(msg$header$code)
message <- msg$body
if (length(message) && substr(message, 1, 8) == "base64::") {
message <- substr(message, 9, nchar(message))
message <- unserialize(processx::base64_decode(message))
} else {
message <- msg$header$rest
}

if (! s[1] %in% names(rs__parse_msg_funcs)) {
throw(new_error("Unknown message code: `", s[1], "`"))
if (! code %in% names(rs__parse_msg_funcs)) {
throw(new_error("Unknown message code: `", code, "`"))
}
structure(
rs__parse_msg_funcs[[ s[1] ]](self, private, code, message),
rs__parse_msg_funcs[[code]](self, private, msg$header$code, message),
class = "callr_session_result")
}

Expand Down Expand Up @@ -652,7 +738,7 @@ rs__status_expr <- function(code, text = "", fd = 3L) {
local({
pxlib <- as.environment("tools:callr")$`__callr_data__`$pxlib
code_ <- code; fd_ <- fd; text_ <- text
data <- paste0(code_, " ", text_, "\n")
data <- paste0(code_, " 0 ", text_, "\n")
pxlib$write_fd(as.integer(fd), data)
}),
list(code = code, fd = fd, text = text)
Expand Down
2 changes: 1 addition & 1 deletion R/script.R
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ make_vanilla_script_expr <- function(expr_file, res, error,
pxlib <- as.environment("tools:callr")$`__callr_data__`$pxlib
if (is.null(e$code)) e$code <- "301"
msg <- paste0("base64::", pxlib$base64_encode(serialize(e, NULL)))
data <- paste(e$code, msg, "\n")
data <- paste0(e$code, " ", nchar(msg), "\n", msg)
pxlib$write_fd(3L, data)

if (inherits(e, "cli_message") &&
Expand Down
33 changes: 33 additions & 0 deletions tests/testthat/test-r-session-messages.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,36 @@ test_that("message handlers", {

rs$close()
})

test_that("large messages", {
rs <- r_session$new()
on.exit(rs$close(), add = TRUE)

do <- function() {
msg <- structure(list(message = paste(1:150000, sep = " ")),
class = c("myclass", "callr_message", "condition"))
signalCondition(msg)
for (i in 1:5) {
msg <- structure(list(message = paste("message", i)),
class = c("myclass", "callr_message", "condition"))
signalCondition(msg)
}
}

cond <- list()
withr::with_options(
list(callr.condition_handler_myclass = function(x) {
cond <<- c(cond, list(x))
}),
rs$run(do)
)

expect_equal(length(cond), 6)
expect_s3_class(cond[[1]], "myclass")
expect_equal(cond[[1]]$message, paste(1:150000, sep = " "))
for (i in 1:5) {
expect_equal(cond[[i + 1]]$message, paste("message", i))
}

rs$close()
})

0 comments on commit ced20a6

Please sign in to comment.