Skip to content

Commit

Permalink
Added function to estimate globals size
Browse files Browse the repository at this point in the history
  • Loading branch information
dipterix committed Nov 24, 2024
1 parent 9dbf108 commit 96aabc9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: dipsaus
Type: Package
Title: A Dipping Sauce for Data Analysis and Visualizations
Version: 0.2.9.9001
Version: 0.2.9.9002
Authors@R: c(
person("Zhengjia", "Wang", email = "dipterix.wang@gmail.com", role = c("aut", "cre")),
person("John", "Magnotti", email = "John.Magnotti@pennmedicine.upenn.edu", role = c("ctb"),
Expand Down
33 changes: 25 additions & 8 deletions R/parallels-future.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@

get_globals_size <- function(fun, args = list()) {
# function_globals <- future::getGlobalsAndPackages(list(fun), envir = environment(fun), maxSize = 1e12)$globals

# parse_env <- new.env(parent = baseenv())
# parse_env$function_globals <- function_globals
# fun_internal <- new_function2(body = quote({
# force(function_globals)
# }), env = parse_env, quote_type = "quote")

call <- as_call(quote(fun), quote(el), .list = args)
fun_internal <- new_function2(alist(el = ), body = call, quote_type = 'quote', env = environment())
globals_and_packages <- future::getGlobalsAndPackages(list(fun_internal), maxSize = 1e12)
re <- attr(globals_and_packages$globals, "total_size")
return(re)
}

#' Apply, but in parallel
#' @param x vector, list
Expand Down Expand Up @@ -79,10 +93,11 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),

call <- as_call(quote(FUN), quote(el), .list = FUN.args)
if(is.null(callback_call)){
f <- dipsaus::new_function2(alist(el = ), body = call, quote_type = 'quote', env = environment())
f <- new_function2(alist(el = ), body = call, quote_type = 'quote', env = environment())
globals_and_packages <- future::getGlobalsAndPackages(list(f), maxSize = 1e12)

parallel <- TRUE
globals_size <- NA
tryCatch(
{
max_globals_size <- getOption("future.globals.maxSize", default = 524288000) # 524288000 = 500MB
Expand Down Expand Up @@ -138,7 +153,7 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),
future.packages = globals_and_packages$packages
)
} else {
message("Using single thread due to large global objects")
message(sprintf("Using single thread due to large global objects (%s)", to_ram_size(globals_size)))
fs <- lapply(x, f)
}
} else {
Expand All @@ -156,8 +171,8 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),

# manually parse globals
...p <- progressr::progressor(steps = 2 * length(x) + 2)
...FUN2 <- dipsaus::new_function2(args = formals(FUN), env = baseenv())
...callback2 <- dipsaus::new_function2(args = formals(callback), env = baseenv())
...FUN2 <- new_function2(args = formals(FUN), env = baseenv())
...callback2 <- new_function2(args = formals(callback), env = baseenv())
...FUN.args <- FUN.args
...FUN_globals <- future::getGlobalsAndPackages(FUN, envir = environment(FUN), maxSize = 1e12)$globals
...callback_globals <- future::getGlobalsAndPackages(callback, envir = environment(callback), maxSize = 1e12)$globals
Expand All @@ -181,7 +196,7 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),

globals_and_packages <- future::getGlobalsAndPackages(list(FUN, callback, ff), maxSize = 1e12)

f <- dipsaus::new_function2(alist(el = ), body = bquote({
f <- new_function2(alist(el = ), body = bquote({

# callback
runtime <- new.env(parent = globalenv())
Expand Down Expand Up @@ -211,10 +226,12 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),
}), quote_type = "quote", env = new.env(parent = store_env))

parallel <- TRUE
globals_size <- NA
tryCatch(
{
max_globals_size <- getOption("future.globals.maxSize", default = 524288000) # 524288000 = 500MB
if( isTRUE(attr(globals_and_packages$globals, "total_size") >= max_globals_size) ) {
globals_size <- attr(globals_and_packages$globals, "total_size")
if( isTRUE(globals_size >= max_globals_size) ) {
future::plan('sequential')
parallel <- FALSE
}
Expand Down Expand Up @@ -267,7 +284,7 @@ lapply_async2 <- function(x, FUN, FUN.args = list(),
future.packages = globals_and_packages$packages
)
} else {
...p(message = "Using single thread due to large global objects\n")
...p(message = sprintf("Using single thread due to large global objects (%s)\n", to_ram_size(globals_size)))
fs <- lapply(x, f)
}

Expand Down
4 changes: 2 additions & 2 deletions R/shiny-progress.R
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,11 @@ handler_dipsaus_progress <- function (
detail <- paste(msg[-1], collapse = '|')
}else{
message <- NULL
detail <- state$message
detail <- paste(state$message, collapse = "")
}
} else {
message <- NULL
detail <- ""
detail <- "..."
}
pb$inc(message = message, detail = detail,
amount = state$delta)
Expand Down

0 comments on commit 96aabc9

Please sign in to comment.