From 96aabc9d9f8b7682683a8599bbe1fc28e3f00e92 Mon Sep 17 00:00:00 2001 From: dipterix Date: Sun, 24 Nov 2024 15:24:36 -0500 Subject: [PATCH] Added function to estimate globals size --- DESCRIPTION | 2 +- R/parallels-future.R | 33 +++++++++++++++++++++++++-------- R/shiny-progress.R | 4 ++-- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 7ed2c54..328a118 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: dipsaus Type: Package Title: A Dipping Sauce for Data Analysis and Visualizations -Version: 0.2.9.9001 +Version: 0.2.9.9002 Authors@R: c( person("Zhengjia", "Wang", email = "dipterix.wang@gmail.com", role = c("aut", "cre")), person("John", "Magnotti", email = "John.Magnotti@pennmedicine.upenn.edu", role = c("ctb"), diff --git a/R/parallels-future.R b/R/parallels-future.R index 88600ff..e0d280f 100644 --- a/R/parallels-future.R +++ b/R/parallels-future.R @@ -1,4 +1,18 @@ - +get_globals_size <- function(fun, args = list()) { + # function_globals <- future::getGlobalsAndPackages(list(fun), envir = environment(fun), maxSize = 1e12)$globals + + # parse_env <- new.env(parent = baseenv()) + # parse_env$function_globals <- function_globals + # fun_internal <- new_function2(body = quote({ + # force(function_globals) + # }), env = parse_env, quote_type = "quote") + + call <- as_call(quote(fun), quote(el), .list = args) + fun_internal <- new_function2(alist(el = ), body = call, quote_type = 'quote', env = environment()) + globals_and_packages <- future::getGlobalsAndPackages(list(fun_internal), maxSize = 1e12) + re <- attr(globals_and_packages$globals, "total_size") + return(re) +} #' Apply, but in parallel #' @param x vector, list @@ -79,10 +93,11 @@ lapply_async2 <- function(x, FUN, FUN.args = list(), call <- as_call(quote(FUN), quote(el), .list = FUN.args) if(is.null(callback_call)){ - f <- dipsaus::new_function2(alist(el = ), body = call, quote_type = 'quote', env = environment()) + f <- new_function2(alist(el = ), body = call, quote_type = 'quote', env = environment()) globals_and_packages <- future::getGlobalsAndPackages(list(f), maxSize = 1e12) parallel <- TRUE + globals_size <- NA tryCatch( { max_globals_size <- getOption("future.globals.maxSize", default = 524288000) # 524288000 = 500MB @@ -138,7 +153,7 @@ lapply_async2 <- function(x, FUN, FUN.args = list(), future.packages = globals_and_packages$packages ) } else { - message("Using single thread due to large global objects") + message(sprintf("Using single thread due to large global objects (%s)", to_ram_size(globals_size))) fs <- lapply(x, f) } } else { @@ -156,8 +171,8 @@ lapply_async2 <- function(x, FUN, FUN.args = list(), # manually parse globals ...p <- progressr::progressor(steps = 2 * length(x) + 2) - ...FUN2 <- dipsaus::new_function2(args = formals(FUN), env = baseenv()) - ...callback2 <- dipsaus::new_function2(args = formals(callback), env = baseenv()) + ...FUN2 <- new_function2(args = formals(FUN), env = baseenv()) + ...callback2 <- new_function2(args = formals(callback), env = baseenv()) ...FUN.args <- FUN.args ...FUN_globals <- future::getGlobalsAndPackages(FUN, envir = environment(FUN), maxSize = 1e12)$globals ...callback_globals <- future::getGlobalsAndPackages(callback, envir = environment(callback), maxSize = 1e12)$globals @@ -181,7 +196,7 @@ lapply_async2 <- function(x, FUN, FUN.args = list(), globals_and_packages <- future::getGlobalsAndPackages(list(FUN, callback, ff), maxSize = 1e12) - f <- dipsaus::new_function2(alist(el = ), body = bquote({ + f <- new_function2(alist(el = ), body = bquote({ # callback runtime <- new.env(parent = globalenv()) @@ -211,10 +226,12 @@ lapply_async2 <- function(x, FUN, FUN.args = list(), }), quote_type = "quote", env = new.env(parent = store_env)) parallel <- TRUE + globals_size <- NA tryCatch( { max_globals_size <- getOption("future.globals.maxSize", default = 524288000) # 524288000 = 500MB - if( isTRUE(attr(globals_and_packages$globals, "total_size") >= max_globals_size) ) { + globals_size <- attr(globals_and_packages$globals, "total_size") + if( isTRUE(globals_size >= max_globals_size) ) { future::plan('sequential') parallel <- FALSE } @@ -267,7 +284,7 @@ lapply_async2 <- function(x, FUN, FUN.args = list(), future.packages = globals_and_packages$packages ) } else { - ...p(message = "Using single thread due to large global objects\n") + ...p(message = sprintf("Using single thread due to large global objects (%s)\n", to_ram_size(globals_size))) fs <- lapply(x, f) } diff --git a/R/shiny-progress.R b/R/shiny-progress.R index 4416f48..b72a6b2 100644 --- a/R/shiny-progress.R +++ b/R/shiny-progress.R @@ -288,11 +288,11 @@ handler_dipsaus_progress <- function ( detail <- paste(msg[-1], collapse = '|') }else{ message <- NULL - detail <- state$message + detail <- paste(state$message, collapse = "") } } else { message <- NULL - detail <- "" + detail <- "..." } pb$inc(message = message, detail = detail, amount = state$delta)