diff --git a/NEWS.md b/NEWS.md index d64a3d22..e24e6b55 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,9 @@ # callr development version +* `callr::r_session` now handles large messages from the subprocess + well (#168). + # callr 3.5.0 * callr can now pass the environment of the function to the subprocess, diff --git a/R/r-session.R b/R/r-session.R index 58a0c2eb..f517af23 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -227,6 +227,12 @@ r_session <- R6::R6Class( func_file = NULL, res_file = NULL, + buffer = NULL, + read_buffer = function() + rs__read_buffer(self, private), + read_message = function() + rs__read_message(self, private), + get_result_and_output = function(std = FALSE) rs__get_result_and_output(self, private, std), report_back = function(code, text = "") @@ -305,22 +311,98 @@ rs_init <- function(self, private, super, options, wait, wait_timeout) { } rs_read <- function(self, private) { - out <- processx::processx_conn_read_lines(private$pipe, 1) + if (!is.null(private$buffer)) { + # There is a partial message in the buffer, try to finish it. + out <- private$read_buffer() + } else { + # A new message. + out <- private$read_message() + } if (!length(out)) { if (processx::processx_conn_is_incomplete(private$pipe)) return() if (self$is_alive()) { # We do this in on.exit(), because parse_msg still reads the streams on.exit(self$kill(), add = TRUE) - out <- "502 R session closed the process connection, killed" + out <- list(header = list( + code = 502, length = 0, + rest = "R session closed the process connection, killed" + )) } else if (identical(es <- self$get_exit_status(), 0L)) { - out <- "500 R session finished cleanly" + out <- list(header = list( + code = 500, length = 0, + rest = "R session finished cleanly" + )) } else { - out <- paste0("501 R session crashed with exit code ", es) + out <- list(header = list( + code = 501, length = 0, + rest = paste0("R session crashed with exit code ", es) + )) } } if (length(out)) private$parse_msg(out) } +rs__read_buffer <- function(self, private) { + # There is a partial message in the buffer already, we need to + # read some more + need <- private$buffer$header$length - private$buffer$got + chunk <- processx::processx_conn_read_chars(private$pipe, need) + got <- nchar(chunk) + if (got == 0) { + # make this special case fast + NULL + } else if (got == need) { + msg <- list( + header = private$buffer$header, + body = paste(c(private$buffer$chunks, list(chunk)), collapse = "") + ) + private$buffer <- NULL + msg + } else { + private$buffer$got <- private$buffer$got + got + private$buffer$chunks <- c(private$buffer$chunks, list(chunk)) + NULL + } +} + +rs__read_message <- function(self, private) { + # A new message, we can surely read the first line + out <- processx::processx_conn_read_lines(private$pipe, 1) + if (length(out) == 0) return(NULL) + + header <- rs__parse_header(out) + body <- "" + if (header$length > 0) { + body <- processx::processx_conn_read_chars( + private$pipe, + header$length + ) + } + got <- nchar(body) + if (got < header$length) { + # Partial message + private$buffer <- list( + header = header, + got = got, + chunks = list(body) + ) + NULL + } else { + list(header = header, body = body) + } +} + +rs__parse_header <- function(line) { + parts <- strsplit(line, " ", fixed = TRUE)[[1]] + parts2 <- suppressWarnings(as.integer(parts[1:2])) + rest <- paste(parts[-(1:2)], collapse = " ") + header <- list(code = parts2[1], length = parts2[2], rest = rest) + if (is.na(header$code) || is.na(header$length)) { + stop("Internal callr error, invalid message header") + } + header +} + rs_close <- function(self, private, grace) { processx::processx_conn_close(self$get_input_connection()) self$poll_process(grace) @@ -571,7 +653,10 @@ rs__attach_wait <- function(self, private) { } rs__report_back <- function(self, private, code, text) { - cmd <- paste0(deparse(rs__status_expr(code, text, fd = 3)), "\n") + cmd <- paste0( + deparse(rs__status_expr(code, text, fd = 3)), + "\n" + ) private$write_for_sure(cmd) } @@ -584,19 +669,20 @@ rs__write_for_sure <- function(self, private, text) { } rs__parse_msg <- function(self, private, msg) { - s <- strsplit(msg, " ", fixed = TRUE)[[1]] - code <- as.integer(s[1]) - message <- paste(s[-1], collapse = " ") - if (substr(message, 1, 8) == "base64::") { + code <- as.character(msg$header$code) + message <- msg$body + if (length(message) && substr(message, 1, 8) == "base64::") { message <- substr(message, 9, nchar(message)) message <- unserialize(processx::base64_decode(message)) + } else { + message <- msg$header$rest } - if (! s[1] %in% names(rs__parse_msg_funcs)) { - throw(new_error("Unknown message code: `", s[1], "`")) + if (! code %in% names(rs__parse_msg_funcs)) { + throw(new_error("Unknown message code: `", code, "`")) } structure( - rs__parse_msg_funcs[[ s[1] ]](self, private, code, message), + rs__parse_msg_funcs[[code]](self, private, msg$header$code, message), class = "callr_session_result") } @@ -652,7 +738,7 @@ rs__status_expr <- function(code, text = "", fd = 3L) { local({ pxlib <- as.environment("tools:callr")$`__callr_data__`$pxlib code_ <- code; fd_ <- fd; text_ <- text - data <- paste0(code_, " ", text_, "\n") + data <- paste0(code_, " 0 ", text_, "\n") pxlib$write_fd(as.integer(fd), data) }), list(code = code, fd = fd, text = text) diff --git a/R/script.R b/R/script.R index d3eddaec..e9cccd65 100644 --- a/R/script.R +++ b/R/script.R @@ -68,7 +68,7 @@ make_vanilla_script_expr <- function(expr_file, res, error, pxlib <- as.environment("tools:callr")$`__callr_data__`$pxlib if (is.null(e$code)) e$code <- "301" msg <- paste0("base64::", pxlib$base64_encode(serialize(e, NULL))) - data <- paste(e$code, msg, "\n") + data <- paste0(e$code, " ", nchar(msg), "\n", msg) pxlib$write_fd(3L, data) if (inherits(e, "cli_message") && diff --git a/tests/testthat/test-r-session-messages.R b/tests/testthat/test-r-session-messages.R index d802da32..0f2fb9fa 100644 --- a/tests/testthat/test-r-session-messages.R +++ b/tests/testthat/test-r-session-messages.R @@ -58,3 +58,36 @@ test_that("message handlers", { rs$close() }) + +test_that("large messages", { + rs <- r_session$new() + on.exit(rs$close(), add = TRUE) + + do <- function() { + msg <- structure(list(message = paste(1:150000, sep = " ")), + class = c("myclass", "callr_message", "condition")) + signalCondition(msg) + for (i in 1:5) { + msg <- structure(list(message = paste("message", i)), + class = c("myclass", "callr_message", "condition")) + signalCondition(msg) + } + } + + cond <- list() + withr::with_options( + list(callr.condition_handler_myclass = function(x) { + cond <<- c(cond, list(x)) + }), + rs$run(do) + ) + + expect_equal(length(cond), 6) + expect_s3_class(cond[[1]], "myclass") + expect_equal(cond[[1]]$message, paste(1:150000, sep = " ")) + for (i in 1:5) { + expect_equal(cond[[i + 1]]$message, paste("message", i)) + } + + rs$close() +})