From de6ec1f4e78a149b367a743d744e30f889c2f71c Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Tue, 23 Jan 2024 11:23:32 -0500 Subject: [PATCH 1/2] change lump to --lump-s and lumpn to --lump-n #415 --- looper/cli_looper.py | 8 ++++---- looper/looper.py | 4 ++-- tests/smoketests/test_run.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/looper/cli_looper.py b/looper/cli_looper.py index c53047d54..2d7c43fc7 100644 --- a/looper/cli_looper.py +++ b/looper/cli_looper.py @@ -219,19 +219,19 @@ def add_subparser(cmd): for subparser in [run_subparser, rerun_subparser]: subparser.add_argument( "-u", - "--lump", + "--lump-s", default=None, metavar="X", type=html_range(min_val=0, max_val=100, step=0.1, value=0), - help="Total input file size (GB) to batch into one job", + help="Lump by size: total input file size (GB) to batch into one job", ) subparser.add_argument( "-n", - "--lumpn", + "--lump-n", default=None, metavar="N", type=html_range(min_val=1, max_val="num_samples", value=1), - help="Number of commands to batch into one job", + help="Lump by number: number of samples to batch into one job", ) check_subparser.add_argument( diff --git a/looper/looper.py b/looper/looper.py index 65c25bbc5..194410aa2 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -404,8 +404,8 @@ def __call__(self, args, rerun=False, **compute_kwargs): extra_args=args.command_extra, extra_args_override=args.command_extra_override, ignore_flags=args.ignore_flags, - max_cmds=args.lumpn, - max_size=args.lump, + max_cmds=args.lump_n, + max_size=args.lump_s, ) submission_conductors[piface.pipe_iface_file] = conductor diff --git a/tests/smoketests/test_run.py b/tests/smoketests/test_run.py index c646103fc..d89eccf41 100644 --- a/tests/smoketests/test_run.py +++ b/tests/smoketests/test_run.py @@ -439,7 +439,7 @@ def test_looper_run_produces_submission_scripts(self, prep_temp_pep): def test_looper_lumping(self, prep_temp_pep): tp = prep_temp_pep - x = test_args_expansion(tp, "run", ["--lumpn", "2"]) + x = test_args_expansion(tp, "run", ["--lump-n", "2"]) try: main(test_args=x) except Exception: From 65497403cae9ea6f80cb903a0ed66a7eeffde982 Mon Sep 17 00:00:00 2001 From: Donald Campbell <125581724+donaldcampbelljr@users.noreply.github.com> Date: Tue, 23 Jan 2024 13:01:11 -0500 Subject: [PATCH 2/2] add --lump-j and associated tests #415 --- docs/changelog.md | 5 +++++ looper/cli_looper.py | 8 ++++++++ looper/conductor.py | 14 ++++++++++++++ looper/looper.py | 1 + tests/smoketests/test_run.py | 17 +++++++++++++++++ 5 files changed, 45 insertions(+) diff --git a/docs/changelog.md b/docs/changelog.md index eb5511078..42fabbdc4 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -6,6 +6,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - `--portable` flag to looper report +- `--lump-j` allows grouping samples into a defined number of jobs + +### Changed +- `--lumpn` is now `--lump-n` +- `--lump` is now `--lump-s` ## [1.6.0] -- 2023-12-22 diff --git a/looper/cli_looper.py b/looper/cli_looper.py index 2d7c43fc7..fd620a1ec 100644 --- a/looper/cli_looper.py +++ b/looper/cli_looper.py @@ -233,6 +233,14 @@ def add_subparser(cmd): type=html_range(min_val=1, max_val="num_samples", value=1), help="Lump by number: number of samples to batch into one job", ) + subparser.add_argument( + "-j", + "--lump-j", + default=None, + metavar="J", + type=int, + help="Lump samples into number of jobs.", + ) check_subparser.add_argument( "--describe-codes", diff --git a/looper/conductor.py b/looper/conductor.py index e83616332..807d34f3e 100644 --- a/looper/conductor.py +++ b/looper/conductor.py @@ -6,6 +6,7 @@ import subprocess import time import yaml +from math import ceil from copy import copy, deepcopy from json import loads from subprocess import check_output @@ -132,6 +133,7 @@ def __init__( compute_variables=None, max_cmds=None, max_size=None, + max_jobs=None, automatic=True, collate=False, ): @@ -166,6 +168,8 @@ def __init__( include in a single job script. :param int | float | NoneType max_size: Upper bound on total file size of inputs used by the commands lumped into single job script. + :param int | float | NoneType max_jobs: Upper bound on total number of jobs to + group samples for submission. :param bool automatic: Whether the submission should be automatic once the pool reaches capacity. :param bool collate: Whether a collate job is to be submitted (runs on @@ -200,6 +204,16 @@ def __init__( "{}".format(self.extra_pipe_args) ) + if max_jobs: + if max_jobs == 0 or max_jobs < 0: + raise ValueError( + "If specified, max job command count must be a positive integer, greater than zero." + ) + + num_samples = len(self.prj.samples) + samples_per_job = num_samples / max_jobs + max_cmds = ceil(samples_per_job) + if not self.collate: self.automatic = automatic if max_cmds is None and max_size is None: diff --git a/looper/looper.py b/looper/looper.py index 194410aa2..51a9ee02a 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -406,6 +406,7 @@ def __call__(self, args, rerun=False, **compute_kwargs): ignore_flags=args.ignore_flags, max_cmds=args.lump_n, max_size=args.lump_s, + max_jobs=args.lump_j, ) submission_conductors[piface.pipe_iface_file] = conductor diff --git a/tests/smoketests/test_run.py b/tests/smoketests/test_run.py index d89eccf41..5d166fe38 100644 --- a/tests/smoketests/test_run.py +++ b/tests/smoketests/test_run.py @@ -447,6 +447,23 @@ def test_looper_lumping(self, prep_temp_pep): sd = os.path.join(get_outdir(tp), "submission") verify_filecount_in_dir(sd, ".sub", 4) + def test_looper_lumping_jobs(self, prep_temp_pep): + tp = prep_temp_pep + x = test_args_expansion(tp, "run", ["--lump-j", "1"]) + try: + main(test_args=x) + except Exception: + raise pytest.fail("DID RAISE {0}".format(Exception)) + sd = os.path.join(get_outdir(tp), "submission") + verify_filecount_in_dir(sd, ".sub", 2) + + def test_looper_lumping_jobs_negative(self, prep_temp_pep): + tp = prep_temp_pep + x = test_args_expansion(tp, "run", ["--lump-j", "-1"]) + + with pytest.raises(ValueError): + main(test_args=x) + def test_looper_limiting(self, prep_temp_pep): tp = prep_temp_pep x = test_args_expansion(tp, "run", ["--limit", "2"])