Skip to content

Commit

Permalink
Merge pull request #17 from dipterix/lapply_async-fix
Browse files Browse the repository at this point in the history
Fixed `lapply_async2` using full memory
  • Loading branch information
dipterix authored Oct 3, 2024
2 parents 3742835 + d79a3a8 commit 65c0540
Show file tree
Hide file tree
Showing 7 changed files with 691 additions and 587 deletions.
6 changes: 3 additions & 3 deletions CRAN-SUBMISSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Version: 0.2.8
Date: 2023-07-03 19:28:14 UTC
SHA: a8ccb85ba2409243c049bcbf6ba6e938ff538d46
Version: 0.2.9
Date: 2024-06-27 03:36:14 UTC
SHA: 3742835cde669474a2524cb1ec27972af8ce4ee5
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: dipsaus
Type: Package
Title: A Dipping Sauce for Data Analysis and Visualizations
Version: 0.2.9
Version: 0.2.9.9000
Authors@R: c(
person("Zhengjia", "Wang", email = "dipterix.wang@gmail.com", role = c("aut", "cre")),
person("John", "Magnotti", email = "John.Magnotti@pennmedicine.upenn.edu", role = c("ctb"),
Expand Down Expand Up @@ -61,7 +61,7 @@ Suggests:
remotes,
glue
Roxygen: list(r6 = FALSE)
RoxygenNote: 7.3.1
RoxygenNote: 7.3.2
LinkingTo:
Rcpp
VignetteBuilder: knitr
137 changes: 111 additions & 26 deletions R/parallels-future.R
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),
callback_call <- quote(callback())
}
}else{
callback_formals <- list()
callback_call <- NULL
}

Expand All @@ -112,7 +113,7 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),
x, f, future.seed = future.seed,
future.scheduling = TRUE,
future.chunk.size = future.chunk.size)
}else{
} else {

old.handlers <- progressr::handlers(handler_dipsaus_progress())
on.exit({
Expand All @@ -124,38 +125,122 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),
# if(is.null(shiny::getDefaultReactiveDomain())){

progressr::with_progress({
p <- progressr::progressor(steps = 2 * length(x) + 1)
...p <- progressr::progressor(steps = 2 * length(x) + 1)

# f <- function(el, ...) {
# msg <- ""
# if( is.function(callback) ){
# callback_formals <- formals(callback)
# if (length(callback_formals)){
# msg <- callback(el)
# }else{
# msg <- callback()
# }
# }
# if(is.character(msg)) {
# msg <- paste(msg, collapse = "")
# } else {
# msg <- deparse(el, width.cutoff = 30)
# if(length(msg) > 1){
# msg <- msg[[1]]
# }
# if(nchar(msg) >= 10){
# msg <- sprintf("%s...", substr(msg, stop = 7, start = 1))
# }
# }
#
# p(message = sprintf("%s (started)", msg))
# on.exit({
# p(message = sprintf("%s (end)", msg))
# }, add = TRUE, after = TRUE)
#
# return(FUN(el, ...))
# }

# f <- dipsaus::new_function2(alist(el = ), body = bquote({
# ...msg... <- .(callback_call)
# if(is.character(...msg...)) {
# ...msg... <- paste(...msg..., collapse = "")
# } else {
# ...msg... <- deparse(el, width.cutoff = 30)
# if(length(...msg...) > 1){
# ...msg... <- ...msg...[[1]]
# }
# if(nchar(...msg...) >= 10){
# ...msg... <- sprintf("%s...", substr(...msg..., stop = 7, start = 1))
# }
# }
# p(message = sprintf("%s (started)", ...msg...), )
# on.exit({
# p(message = sprintf("%s (end)", ...msg...))
# }, add = TRUE, after = TRUE)
#
# .(call)
#
# }), quote_type = "quote", env = environment())

# manually parse globals
...FUN2 <- dipsaus::new_function2(args = formals(FUN), env = baseenv())
...callback2 <- dipsaus::new_function2(args = formals(callback), env = baseenv())
...FUN.args <- FUN.args
...FUN_globals <- future::getGlobalsAndPackages(FUN, envir = environment(FUN))$globals
...callback_globals <- future::getGlobalsAndPackages(callback, envir = environment(callback))$globals

# print(...FUN_globals)

ff <- function() {
...p
...FUN.args
...FUN2
...FUN_globals
...callback_globals
...callback2
}
globals_and_packages <- future::getGlobalsAndPackages(list(FUN, callback, ff))

# print(names(globals_and_packages$globals))

f <- dipsaus::new_function2(alist(el = ), body = bquote({
...msg... <- .(callback_call)
if(is.character(...msg...)) {
...msg... <- paste(...msg..., collapse = "")
} else {
...msg... <- deparse(el, width.cutoff = 30)
if(length(...msg...) > 1){
...msg... <- ...msg...[[1]]
}
if(nchar(...msg...) >= 10){
...msg... <- sprintf("%s...", substr(...msg..., stop = 7, start = 1))
}

# callback
runtime <- new.env(parent = globalenv())
list2env(...callback_globals, envir = runtime)
environment(...callback2) <- runtime
body(...callback2) <- quote(.(body(callback)))
if (length(formals(...callback2))){
msg <- ...callback2(el)
}else{
msg <- ...callback2()
}
p(message = sprintf("%s (started)", ...msg...), )
...p(message = sprintf("%s (started)", msg))
on.exit({
p(message = sprintf("%s (end)", ...msg...))
...p(message = sprintf("%s (end)", msg))
}, add = TRUE, after = TRUE)

.(call)
# FUN
runtime <- new.env(parent = globalenv())
list2env(...FUN_globals, envir = runtime)

}), quote_type = "quote", env = environment())
# f <- dipsaus::new_function2(alist(el = ), body = rlang::quo({
# p(message = sprintf("%s (started)", ...msg...), )
# eval(!!call)
# p(message = sprintf("%s (end)", ...msg...))
# }), quote_type = 'quote', env = environment())
fs <- future.apply::future_lapply(x, f,
future.scheduling = TRUE,
future.chunk.size = future.chunk.size,
future.seed = future.seed)
environment(...FUN2) <- runtime
body(...FUN2) <- quote(.(body(FUN)))

re <- do.call(...FUN2, c(list(el), ...FUN.args))
return(re)

}), quote_type = "quote", env = new.env(parent = globalenv()))

fs <- future.apply::future_lapply(
x, f,
future.scheduling = TRUE,
future.chunk.size = future.chunk.size,
future.seed = future.seed,
future.globals = globals_and_packages$globals,
future.packages = globals_and_packages$packages
)
# fs <- future.apply::future_lapply(x, f,
# future.scheduling = TRUE,
# future.chunk.size = future.chunk.size,
# future.seed = future.seed)

p("Results collected\n")

Expand Down
19 changes: 12 additions & 7 deletions R/shiny-progress.R
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,19 @@ handler_dipsaus_progress <- function (
# make sure pb exists
make_pb(title = title, max = config$max_steps)
if( state$delta > 0 ){
# message|details
msg <- strsplit(state$message, '|', TRUE)[[1]]
if(length(msg) > 1){
message <- msg[[1]]
detail <- paste(msg[-1], collapse = '|')
}else{
if(length(state$message)) {
# message|details
msg <- strsplit(state$message, '|', TRUE)[[1]]
if(length(msg) > 1){
message <- msg[[1]]
detail <- paste(msg[-1], collapse = '|')
}else{
message <- NULL
detail <- state$message
}
} else {
message <- NULL
detail <- state$message
detail <- ""
}
pb$inc(message = message, detail = detail,
amount = state$delta)
Expand Down
Loading

0 comments on commit 65c0540

Please sign in to comment.