From c2af92a6531e329ddc7f6dfda2921b321edbccb6 Mon Sep 17 00:00:00 2001 From: wlandau Date: Mon, 25 Sep 2023 06:02:53 -0400 Subject: [PATCH] Migrate to clustermq 0.9.0 interface --- .github/workflows/cover.yaml | 36 ----------------------------- .github/workflows/lint.yaml | 26 --------------------- DESCRIPTION | 4 ++-- NEWS.md | 4 ++-- R/backend_clustermq.R | 45 +++++++++++------------------------- R/backend_future.R | 24 +++++++++---------- 6 files changed, 30 insertions(+), 109 deletions(-) delete mode 100644 .github/workflows/cover.yaml delete mode 100644 .github/workflows/lint.yaml diff --git a/.github/workflows/cover.yaml b/.github/workflows/cover.yaml deleted file mode 100644 index 2fee64d64..000000000 --- a/.github/workflows/cover.yaml +++ /dev/null @@ -1,36 +0,0 @@ -# Workflow derived from https://github.com/r-lib/actions/tree/v2/examples -# Need help debugging build failures? Start at https://github.com/r-lib/actions#where-to-find-help -on: [push, pull_request] - -name: cover - -jobs: - cover: - runs-on: ubuntu-latest - env: - GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} - NOT_CRAN: true - R_REMOTES_NO_ERRORS_FROM_WARNINGS: false - TORCH_INSTALL: 1 - - steps: - - uses: actions/checkout@v2 - - - name: Install Linux system dependencies - if: runner.os == 'Linux' - run: | - sudo apt-get update - sudo apt-get install libglpk-dev libglpk40 - - - uses: r-lib/actions/setup-r@v2 - with: - use-public-rspm: true - - - uses: r-lib/actions/setup-r-dependencies@v2 - with: - extra-packages: any::covr - needs: coverage - - - name: Test coverage - run: covr::codecov(quiet = FALSE) - shell: Rscript {0} diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml deleted file mode 100644 index 3f4df0b30..000000000 --- a/.github/workflows/lint.yaml +++ /dev/null @@ -1,26 +0,0 @@ -# Workflow derived from https://github.com/r-lib/actions/tree/v2/examples -# Need help debugging build failures? Start at https://github.com/r-lib/actions#where-to-find-help -on: [push, pull_request] - -name: lint - -jobs: - lint: - runs-on: ubuntu-latest - env: - GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} - steps: - - uses: actions/checkout@v2 - - - uses: r-lib/actions/setup-r@v2 - with: - use-public-rspm: true - - - uses: r-lib/actions/setup-r-dependencies@v2 - with: - extra-packages: any::lintr, local::. - needs: lint - - - name: Lint - run: lintr::lint_package() - shell: Rscript {0} diff --git a/DESCRIPTION b/DESCRIPTION index 031b97087..5964c78dc 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: drake Title: A Pipeline Toolkit for Reproducible Computation at Scale -Version: 7.13.5.9000 +Version: 7.13.6 Authors@R: c( person( given = c("William", "Michael"), @@ -120,7 +120,7 @@ Suggests: bindr, callr, cli (>= 1.1.0), - clustermq (>= 0.8.8), + clustermq (>= 0.9.0), crayon, curl (>= 2.7), data.table, diff --git a/NEWS.md b/NEWS.md index 61764bd10..c4b55284c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,6 @@ -# Version 7.13.5.9000 - +# Version 7.13.6 +* Migrate to the new interface in `clustermq` 0.9.0 (@mschubert). # Version 7.13.5 diff --git a/R/backend_clustermq.R b/R/backend_clustermq.R index 2b74ac189..ffa1efad1 100644 --- a/R/backend_clustermq.R +++ b/R/backend_clustermq.R @@ -49,43 +49,28 @@ cmq_set_common_data <- function(config) { export <- as.list(config$envir, all.names = TRUE) # nocov } export$config <- hpc_config(config) - config$workers$set_common_data( - export = export, - fun = identity, - const = list(), - rettype = list(), - pkgs = character(0), - common_seed = config$settings$seed, - token = "set_common_data_token" - ) + do.call(what = config$workers$env, args = export) } cmq_main <- function(config) { - on.exit(config$workers$finalize()) + on.exit(config$workers$cleanup()) config$logger$disk("begin scheduling targets") while (config$counter$remaining > 0) { cmq_main_iter(config) } - if (config$workers$cleanup()) { - on.exit() - } } cmq_main_iter <- function(config) { - msg <- config$workers$receive_data() - cmq_conclude_build(msg = msg, config = config) - if (!identical(msg$token, "set_common_data_token")) { - config$logger$disk("sending common data") - config$workers$send_common_data() - } else if (!config$queue$empty()) { + build <- config$workers$recv() + cmq_conclude_build(build = build, config = config) + if (!config$queue$empty()) { cmq_next_target(config) } else { - config$workers$send_shutdown_worker() + config$workers$send_shutdown() } } -cmq_conclude_build <- function(msg, config) { - build <- msg$result +cmq_conclude_build <- function(build, config) { if (is.null(build)) { return() } @@ -158,8 +143,8 @@ cmq_send_target <- function(target, config) { spec <- hpc_spec(target, config) config_tmp <- get_hpc_config_tmp(config) config$logger$disk("build on an hpc worker", target = target) - config$workers$send_call( - expr = drake::cmq_build( + config$workers$send( + cmd = drake::cmq_build( target = target, meta = meta, deps = deps, @@ -167,13 +152,11 @@ cmq_send_target <- function(target, config) { config_tmp = config_tmp, config = config ), - env = list( - target = target, - meta = meta, - deps = deps, - spec = spec, - config_tmp = config_tmp - ) + target = target, + meta = meta, + deps = deps, + spec = spec, + config_tmp = config_tmp ) } diff --git a/R/backend_future.R b/R/backend_future.R index 10e1dfd4f..bf4ba3bd4 100644 --- a/R/backend_future.R +++ b/R/backend_future.R @@ -115,12 +115,12 @@ ft_launch_worker <- function(target, meta, protect, config) { } future_globals <- function( - target, - meta, - config, - spec, - config_tmp, - protect + target, + meta, + config, + spec, + config_tmp, + protect ) { globals <- list( DRAKE_GLOBALS__ = list( @@ -161,12 +161,12 @@ future_globals <- function( #' @param protect Names of targets that still need their #' dependencies available in memory. future_build <- function( - target, - meta, - config, - spec, - config_tmp, - protect + target, + meta, + config, + spec, + config_tmp, + protect ) { config$spec <- spec config <- restore_hpc_config_tmp(config_tmp, config)