Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhead when exporting to PSOCK cluster #98

Open
odelmarcelle opened this issue Mar 10, 2022 · 5 comments
Open

Overhead when exporting to PSOCK cluster #98

odelmarcelle opened this issue Mar 10, 2022 · 5 comments

Comments

@odelmarcelle
Copy link

Describe the bug
I believe that a significant (and avoidable) overhead is present when using future_lapply inside another function. I think this might be related to the exportation of ...future.FUN, which is serialized with its enclosing environment.

I experience this issue on Windows and had not the chance to give it a try on macOS. I suspect this is only relevant to Windows.

Expected behavior
The overhead in deploying a function on multiple cores should remain limited.

Reproduce example
Consider the following (lengthy) example, where I apply identity() to a large list of characters.

long_characters <- setNames(replicate(
  10000,
  paste0(c(letters, rep(" ", 10))[
    sample.int(length(letters) + 1, 5000, replace = TRUE)],
    collapse = ""),
  simplify = FALSE
), paste0("element", 1:10000))
object.size(long_characters) |> format("Mb")
## [1] "49.5 Mb"

some_function1 <- function(x) {
  identity(x)
}
some_function2 <- function(x) {
  future_lapply(x, function(y) {
    identity(y)
  })
}

The second function is nesting 'future_lapply' and this appears to have important implications regarding the parallel processing overhead. I quickly benchmark the functions for a sequential backend.

library(future)
library(future.apply)
library(foreach); library(doFuture); registerDoFuture()
plan(sequential)
benchmark <- bench::mark(
  `1` = future_lapply(long_characters, identity),
  `2` = future_lapply(long_characters, some_function1),
  `3` = some_function2(long_characters),
  `4` = foreach(x = long_characters, .final = function(x) setNames(x, names(long_characters))
  ) %dopar% {some_function1(x)},
  filter_gc = FALSE,
  min_iterations = 5
)
benchmark[, c(1:5, 7)]
# # A tibble: 4 x 6
# expression        min   median `itr/sec` mem_alloc n_itr
# <bch:expr>   <bch:tm> <bch:tm>    <dbl>  <bch:byt> <int>
# 1 1            22.1ms  23.08ms    41.7      4.29MB    21
# 2 2           25.89ms  26.69ms    36.0   1006.98KB    18
# 3 3            26.5ms  27.39ms    34.2   1006.48KB    18
# 4 4             1.21s    1.24s     0.809   53.67MB     5

All apply functions are more or less similar, and foreach is slower, apparently due to some memory allocation. The results change drastically once a multisession is registered:

plan(multisession, workers = 2)
benchmark <- bench::mark(
  `1` = future_lapply(long_characters, identity),
  `2` = future_lapply(long_characters, some_function1),
  `3` = some_function2(long_characters),
  `4` = foreach(x = long_characters, .final = function(x) setNames(x, names(long_characters))
  ) %dopar% {some_function1(x)},
  filter_gc = FALSE,
  min_iterations = 5
)
benchmark[, c(1:5, 7)]
# # A tibble: 4 x 6
# expression        min   median `itr/sec` mem_alloc n_itr
# <bch:expr>   <bch:tm> <bch:tm>     <dbl> <bch:byt> <int>
# 1 1           574.9ms 579.89ms     1.57     1.91MB     5
# 2 2          568.84ms 570.91ms     1.68     1.83MB     5
# 3 3          861.42ms    1.16s     0.910    1.84MB     5
# 4 4             1.37s    1.44s     0.702   53.66MB     5

Here the nested call of future_lapply() inside some_function2() appears to suffer compared to the first two alternatives. The foreach option is relatively unaffected by the changed plan.

The gap between the nested and non-nested future_lapply() increases as the number of cores rises:

plan(multisession, workers = 10)
benchmark <- bench::mark(
  `1` = future_lapply(long_characters, identity),
  `2` = future_lapply(long_characters, some_function1),
  `3` = some_function2(long_characters),
  `4` = foreach(x = long_characters, .final = function(x) setNames(x, names(long_characters))
  ) %dopar% {some_function1(x)},
  filter_gc = FALSE,
  min_iterations = 5
)
benchmark[, c(1:5, 7)]
# # A tibble: 4 x 6
# expression        min   median `itr/sec` mem_alloc n_itr
# <bch:expr>   <bch:tm> <bch:tm>     <dbl> <bch:byt> <int>
# 1 1          880.32ms     1.5s     0.724    4.17MB     5
# 2 2          580.24ms    1.21s     0.977    4.09MB     5
# 3 3             4.45s    4.75s     0.208    4.17MB     5
# 4 4              1.4s    1.42s     0.568   55.48MB     5

Now future_lapply() is even slower than foreach! Foreach is, in contrast with the other options, barely affected by the increase in the number of processes.

To investigate the matter further, I implemented a very dirty replacement of the internal function sendData.SOCK0node() of the parallel package. Doing so, I added some lines to write on a logging file the time elapsed for each exportation to a cluster node.
I know that future already implements many monitoring tools when setting options(future.debug = TRUE), but I felt something off when observing the log of exported values. Instead, I implemented this logging system using pryr::pryr::object_size(), supposed to be more reliable than utils::object.size(). The results suggest that the enclosing environment of future_lapply() is unnecessarily exported to each node.

Here is the dirty replacement to the parallel function.

sendData.SOCK0node <- function(node, data) {
  start <- Sys.time()
  on.exit({
    elapsed <- Sys.time() - start
    res <- data.frame(
      Callnr = tracker,
      Elapsed = elapsed,
      Size = format(pryr::object_size(data)),
      Object = NA
    )
    if (length(data$data$args) > 0) res$Object <- as.character(data$data$args[[1]][1])
    write.table(
      res, file = "log.txt", append = TRUE, col.names = FALSE, sep = ",",
      row.names = FALSE
    )
    tracker <<- tracker + 1
  })
  serialize(data, node$con, xdr = FALSE)
}
unlockBinding("sendData.SOCK0node", env = getNamespace("parallel"))
assignInNamespace("sendData.SOCK0node", sendData.SOCK0node, "parallel")
# Error in get(S3[i, 1L], mode = "function", envir = parent.frame()) : 
#   object 'sendData' of mode 'function' was not found
parallel:::sendData.SOCK0node
# function(node, data) {
#   start <- Sys.time()
#   on.exit({
#     elapsed <- Sys.time() - start
#     res <- data.frame(
#       Callnr = tracker,
#       Elapsed = elapsed,
#       Size = format(pryr::object_size(data)),
#       Object = NA
#     )
#     if (length(data$data$args) > 0) res$Object <- as.character(data$data$args[[1]][1])
#     write.table(
#       res, file = "log.txt", append = TRUE, col.names = FALSE, sep = ",",
#       row.names = FALSE
#     )
#     tracker <<- tracker + 1
#   })
#   serialize(data, node$con, xdr = FALSE)
# }

Although this throw an error, it was enough (for me at least) to replace the existing function in the parallel package. I know this seems barbaric, but my limited knowledge of future's internals led me to this.

Hence, I was able to observe the data transferred to each node by monitoring the size with pryr::object_size() for the nested future_lapply() call.

tracker <- 1
cat(r"("Callnr", "Elapsed", "Size", "Object")", "\n", file = "log.txt", sep = "")
plan(multisession, workers = 2)
some_function2(long_characters)
res <- read.csv("log.txt")
res
#      Callnr      Elapsed     Size                    Object
#   1       1 0.0004999638      552                      <NA>
#   2       2 0.0005021095      552                      <NA>
#   3       3 0.0000000000      552                      <NA>
#   4       4 0.0000000000      552                      <NA>
#   5       5 0.0000000000      552                      <NA>
#   6       6 0.0000000000      552                      <NA>
#   7       7 0.0000000000      552                      <NA>
#   8       8 0.0000000000      552                      <NA>
#   9       9 0.0000000000      552                      <NA>
#   10     10 0.0000000000      552                      <NA>
#   11     11 0.0000000000     3936                Sys.getpid
#   12     12 0.0000000000     3936                Sys.getpid
#   13     13 0.0000000000     8328                     FALSE
#   14     14 0.0000000000     3936                Sys.getpid
#   15     15 0.0000000000     8328                     FALSE
#   16     16 0.0000000000     3936                Sys.getpid
#   17     17 0.0000000000     5512                      <NA>
#   18     18 0.0000000000   115456                         {
#   19     19 0.0000000000     5512                      <NA>
#   20     20 0.1896619797 51929376             ...future.FUN
#   21     21 0.0000000000     5232     future.call.arguments
#   22     22 0.0695588589 25965096     ...future.elements_ii
#   23     23 0.0000000000     4944        ...future.seeds_ii
#   24     24 0.0000000000     4944 ...future.globals.maxSize
#   25     25 0.0000000000   119960                         {
#   26     26 0.0004999638     5512                      <NA>
#   27     27 0.1976690292 51929376             ...future.FUN
#   28     28 0.0000000000     5232     future.call.arguments
#   29     29 0.0705609322 25965096     ...future.elements_ii
#   30     30 0.0000000000     4944        ...future.seeds_ii
#   31     31 0.0004999638     4944 ...future.globals.maxSize
#   32     32 0.0005011559   119960                         {

The interesting part here are the lines 20 and 27, corresponding to the export of the function of future_lapply to each of the nodes. According to pryr::object_size(), a large amount of data is sent to the worker at that point. The elapsed time is, in consequence, significant for the function's exportation.

In contrast, the same analysis on the non-nested call to future_lapply() does not show these large exports.

tracker <- 1
cat(r"("Callnr", "Elapsed", "Size", "Object")", "\n", file = "log.txt", sep = "")
plan(multisession, workers = 2)
future_lapply(long_characters, some_function1)
res <- read.csv("log.txt")
res
#      Callnr      Elapsed     Size                    Object
#   1       1 0.0004999638      552                      <NA>
#   2       2 0.0000000000      552                      <NA>
#   3       3 0.0000000000     3936                Sys.getpid
#   4       4 0.0000000000     3936                Sys.getpid
#   5       5 0.0000000000     8328                     FALSE
#   6       6 0.0000000000     3936                Sys.getpid
#   7       7 0.0000000000     8328                     FALSE
#   8       8 0.0000000000     3936                Sys.getpid
#   9       9 0.0000000000     5512                      <NA>
#   10     10 0.0004999638   115456                         {
#   11     11 0.0004999638     5512                      <NA>
#   12     12 0.0000000000     7680             ...future.FUN
#   13     13 0.0000000000     5232     future.call.arguments
#   14     14 0.6837708950 25965096     ...future.elements_ii
#   15     15 0.0000000000     4944        ...future.seeds_ii
#   16     16 0.0000000000     4944 ...future.globals.maxSize
#   17     17 0.0004999638   119960                         {
#   18     18 0.0000000000     5512                      <NA>
#   19     19 0.0000000000     7680             ...future.FUN
#   20     20 0.0000000000     5232     future.call.arguments
#   21     21 0.0700600147 25965096     ...future.elements_ii
#   22     22 0.0000000000     4944        ...future.seeds_ii
#   23     23 0.0005002022     4944 ...future.globals.maxSize
#   24     24 0.0005009174   119960                         {

For the non-nested case, the size of the function's exportation is much smaller.

After investigating the ...future.FUN object more in detail, it seems that it is related to the functions' environment. When serializing the function, the environment is serialized as well, which basically enforces the exportation of the entire input for future_lapply() to each node.

To prove this point, I attempted to replace the environment of the exported function by the global environment:

sendData.SOCK0node <- function(node, data) {
  start <- Sys.time()
  if (length(data$data$args) > 1) {
    if (is.function(data$data$args[[2]])) environment(data$data$args[[2]]) <- .GlobalEnv
  }
  on.exit({
    elapsed <- Sys.time() - start
    res <- data.frame(
      Callnr = tracker,
      Elapsed = elapsed,
      Size = format(pryr::object_size(data)),
      Object = NA
    )
    if (length(data$data$args) > 0) res$Object <- as.character(data$data$args[[1]][1])
    write.table(
      res, file = "log.txt", append = TRUE, col.names = FALSE, sep = ",",
      row.names = FALSE
    )
    tracker <<- tracker + 1
  })
  serialize(data, node$con, xdr = FALSE)
}
unlockBinding("sendData.SOCK0node", env = getNamespace("parallel"))
assignInNamespace("sendData.SOCK0node", sendData.SOCK0node, "parallel")
tracker <- 1
cat(r"("Callnr", "Elapsed", "Size", "Object")", "\n", file = "log.txt", sep = "")
plan(multisession, workers = 2)
system.time(some_function2(long_characters))
res <- read.csv("log.txt")
res
#      Callnr      Elapsed     Size                    Object
#   1       1 0.0000000000      552                      <NA>
#   2       2 0.0004999638      552                      <NA>
#   3       3 0.0000000000     3936                Sys.getpid
#   4       4 0.0000000000     3936                Sys.getpid
#   5       5 0.0000000000     8328                     FALSE
#   6       6 0.0000000000     3936                Sys.getpid
#   7       7 0.0000000000     8328                     FALSE
#   8       8 0.0000000000     3936                Sys.getpid
#   9       9 0.0000000000     5512                      <NA>
#   10     10 0.0000000000   115456                         {
#   11     11 0.0000000000     5512                      <NA>
#   12     12 0.0000000000     7744             ...future.FUN
#   13     13 0.0000000000     5232     future.call.arguments
#   14     14 0.0690588951 25965096     ...future.elements_ii
#   15     15 0.0000000000     4944        ...future.seeds_ii
#   16     16 0.0000000000     4944 ...future.globals.maxSize
#   17     17 0.0000000000   119960                         {
#   18     18 0.0000000000     5512                      <NA>
#   19     19 0.0005009174     7744             ...future.FUN
#   20     20 0.0000000000     5232     future.call.arguments
#   21     21 0.3738429546 25965096     ...future.elements_ii
#   22     22 0.0004999638     4944        ...future.seeds_ii
#   23     23 0.0000000000     4944 ...future.globals.maxSize
#   24     24 0.0000000000   119960                         {

Despite calling the nested call to future_lapply(), it now seems that the enclosing environment of the function is not exported anymore. I'm not sure of the implication on other functions (since now this is only about identity()), but the improvement brought by this little change indicates that some changes in that direction could significantly speed up the use of future_lapply() in other packages.

Finally, I re-benchmarked the functions with 10 cores with this adjusted ...future.FUN environment:

sendData.SOCK0node <- function(node, data) {
  start <- Sys.time()
  if (length(data$data$args) > 1) {
    if (is.function(data$data$args[[2]])) environment(data$data$args[[2]]) <- .GlobalEnv
  }
  serialize(data, node$con, xdr = FALSE)
}
unlockBinding("sendData.SOCK0node", env = getNamespace("parallel"))
assignInNamespace("sendData.SOCK0node", sendData.SOCK0node, "parallel")
plan(multisession, workers = 10)
benchmark <- bench::mark(
  `1` = future_lapply(long_characters, identity),
  `2` = future_lapply(long_characters, some_function1),
  `3` = some_function2(long_characters),
  `4` = foreach(x = long_characters, .final = function(x) setNames(x, names(long_characters))
  ) %dopar% {some_function1(x)},
  filter_gc = FALSE,
  min_iterations = 5
)
benchmark[, c(1:5, 7)]
# # A tibble: 4 x 6
# expression        min   median `itr/sec` mem_alloc n_itr
# <bch:expr>   <bch:tm> <bch:tm>     <dbl> <bch:byt> <int>
# 1 1             1.22s    1.53s     0.606    4.08MB     5
# 2 2             1.21s    1.51s     0.659    4.09MB     5
# 3 3          898.29ms    1.52s     0.659    4.09MB     5
# 4 4             1.44s    1.46s     0.689   55.49MB     5

The nested loop is drastically improved and is now comparable with the first two options! Of course, I'm sure that naively setting the global environment to the function is not the best way to handle this issue. But since a good use-case of future_lapply is to essentially replace lapply() in packages, letting the user free to set the parallelism with plan(), is it a shame that the usage of future_lapply() inside a function suffers from this large overhead.

My session infos:

sessionInfo()
# R version 4.1.2 (2021-11-01)
# Platform: x86_64-w64-mingw32/x64 (64-bit)
# Running under: Windows 10 x64 (build 19043)
# 
# Matrix products: default
# 
# locale:
# [1] LC_COLLATE=French_Belgium.1252  LC_CTYPE=French_Belgium.1252    LC_MONETARY=French_Belgium.1252 LC_NUMERIC=C                   
# [5] LC_TIME=French_Belgium.1252    
# 
# attached base packages:
# [1] stats     graphics  grDevices utils     datasets  methods   base     
# 
# other attached packages:
# [1] doFuture_0.12.0    foreach_1.5.2      future.apply_1.8.1 future_1.24.0     
# 
# loaded via a namespace (and not attached):
# [1] Rcpp_1.0.8        rstudioapi_0.13   parallelly_1.30.0 lobstr_1.1.1      knitr_1.37        magrittr_2.0.2    rlang_1.0.1      
# [8] fastmap_1.1.0     fansi_1.0.2       stringr_1.4.0     globals_0.14.0    tools_4.1.2       parallel_4.1.2    xfun_0.29        
# [15] sessioninfo_1.2.2 utf8_1.2.2        cli_3.2.0         htmltools_0.5.2   iterators_1.0.14  ellipsis_0.3.2    yaml_2.3.5       
# [22] digest_0.6.29     tibble_3.1.6      lifecycle_1.0.1   crayon_1.5.0      pryr_0.1.5        vctrs_0.3.8       codetools_0.2-18 
# [29] glue_1.6.1        evaluate_0.15     rmarkdown_2.11    bench_1.1.2       stringi_1.7.6     compiler_4.1.2    pillar_1.7.0     
# [36] profmem_0.6.0     listenv_0.8.0     pkgconfig_2.0.3  
@odelmarcelle
Copy link
Author

A very related issue: mschubert/clustermq/#47

@HenrikBengtsson
Copy link
Collaborator

HenrikBengtsson commented Mar 16, 2022

Thanks for this, and thanks spending all the time digging through the details and thinking about this problem. And, yes, as you discovered in PR #99, this is a much harder issue than it first appears to be. We are aware of it, and it's on the to-do list to see if it can be improved further. It might be that static-code analysis of the FUN function can be used to identify candidates that can safely be peeled off before exporting it to the worker.

I'm sure the documentation can be improved to raise awareness, e.g. a vignette with examples of the problem, and best-practice suggestions on how to minimize the problem. For example, removing (rm():ing) unused, local objects before calling future_lapply() will shrink the FUN function when that's a local function.

FWIW, here's some related work that will help move this forward:

I've started to work on a profiling framework for futures (part of the roadmap) that will allow us to study what is going on under the hood, e.g. study timing profiles for when futures are created, globals are exported, future expressions are evaluated, finished, results are collected, and so on. This will help the user to better understand, troubleshoot, and workaround certain problems like this. It will also help me identify bottlenecks and benchmark improvements made.

Another related milestone on the roadmap is the future.mapreduce package, which is meant to provide future.apply, furrr, and doFuture (and other similar projects) with a common, core map-reduce framework. This means, that any improvements made, will benefit all those packages. Similarly, user feedback from one of them, is likely to be benefitial for the other packages.

@tle4336
Copy link

tle4336 commented May 10, 2022

@HenrikBengtsson @odelmarcelle I think I just experienced the same situation except I did not compare the future_mapply to foreach...%dopar% but rather a nested for loop. Does it mean for now, we need to use plan(multisession, workers = 2) and put it outside of the scope of the user-defined function where we plan to use future_*apply for benefits of speed?

@tle4336
Copy link

tle4336 commented May 10, 2022

@odelmarcelle I tried your following line of code before putting in the plan(multisession, workers = 10), but it gave the error message on my end: Error in get(S3[i, 1L], mode = "function", envir = parent.frame()) : object 'sendData' of mode 'function' was not found.

sendData.SOCK0node <- function(node, data) {
  
   start <- Sys.time()
  if (length(data$data$args) > 1) {
    if (is.function(data$data$args[[2]])) environment(data$data$args[[2]]) <- .GlobalEnv
  }
  serialize(data, node$con, xdr = FALSE)
}
unlockBinding("sendData.SOCK0node", env = getNamespace("parallel"))
assignInNamespace("sendData.SOCK0node", sendData.SOCK0node, "parallel")

@HenrikBengtsson
Copy link
Collaborator

HenrikBengtsson commented May 12, 2022

@odelmarcelle, your troubleshooting looks spot on. Only one thing to comment on:

I experience this issue on Windows and had not the chance to give it a try on macOS. I suspect this is only relevant to Windows.

This is relevant to all operating systems (e.g. see below example executed on Linux). It's less of a problem when using forked parallel processing, e.g. mclapply(), plan(multicore), doMC::registerDoMC(), doParallel::registerDoParallel(n) on macOS and Linux(!); maybe that's what you're thinking of.

This is a known problem that is general to all parallel frameworks in R, not just the future ecosystem. For example,

my_fcn <- function(cl, cargo = 0L) {
  huge_object_also_exported <- rnorm(cargo)
  parallel::parLapply(cl, X = 1L, fun = function(x) x)
}

cl <- parallel::makeCluster(1L)
trace(parallel:::postNode, tracer = quote(message(sprintf("Size of exported data: %d bytes", lobstr::obj_size(value)))), print = FALSE)

y <- my_fcn(cl = cl, cargo = 0L)
Size of exported data: 9704 bytes

y <- my_fcn(cl = cl, cargo = 1e6L)
Size of exported data: 8010880 bytes

It's rather complicated to automatically fix this problem. Basically, we need to figure out how to prune the environment of the function so that it does not carry the extra cargo holding objects in the local environment that are not necessary for evaluating the function in another, external process. It's easy to tweak it for a few simple toy examples, but for it to work in general is much more complicated.

Also, solving it is unfortunately in a bit of conflict with other objectives (e.g. the ones outlined in futureverse/future#608). I am working towards something that improves on the current state, but it's not easy and if not done extremely carefully, it's very easy to break something else. Thankfully, I've got tons of tests in place that somewhat protect against that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants