Skip to content

Commit

Permalink
Improve slurm controller
Browse files Browse the repository at this point in the history
  • Loading branch information
nefrathenrici committed Jul 11, 2024
1 parent fbc4a04 commit 6330a0b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
72 changes: 42 additions & 30 deletions src/slurm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@ export kwargs, sbatch_model_run, wait_for_jobs

kwargs(; kwargs...) = Dict{Symbol, Any}(kwargs...)

function generate_sbatch_directives(slurm_kwargs)
@assert haskey(slurm_kwargs, :time) "Slurm kwargs must include key :time"

Check warning on line 6 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L5-L6

Added lines #L5 - L6 were not covered by tests

slurm_kwargs[:time] = format_slurm_time(slurm_kwargs[:time])
slurm_directives = map(collect(slurm_kwargs)) do (k, v)
"#SBATCH --$(replace(string(k), "_" => "-"))=$(replace(string(v), "_" => "-"))"

Check warning on line 10 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L8-L10

Added lines #L8 - L10 were not covered by tests
end
return join(slurm_directives, "\n")

Check warning on line 12 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L12

Added line #L12 was not covered by tests
end

"""
generate_sbatch_script(
iter, member,
generate_sbatch_script(iter, member,
output_dir, experiment_dir, model_interface;
module_load_str, slurm_kwargs,
)
Expand All @@ -14,28 +23,22 @@ Generate a string containing an sbatch script to run the forward model.
Helper function for `sbatch_model_run`.
"""
function generate_sbatch_script(
iter,
member,
output_dir,
experiment_dir,
model_interface,
module_load_str;
iter::Int,
member::Int,
output_dir::AbstractString,
experiment_dir::AbstractString,
model_interface::AbstractString,
module_load_str::AbstractString;
slurm_kwargs,
)
member_log = path_to_model_log(output_dir, iter, member)

# Format time in minutes to string for slurm
slurm_kwargs[:time] = format_slurm_time(slurm_kwargs[:time])

slurm_directives = map(collect(slurm_kwargs)) do (k, v)
"#SBATCH --$(replace(string(k), "_" => "-"))=$(replace(string(v), "_" => "-"))"
end
slurm_directives = generate_sbatch_directives(slurm_kwargs)

Check warning on line 35 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L35

Added line #L35 was not covered by tests

sbatch_contents = """
#!/bin/bash
#SBATCH --job-name=run_$(iter)_$(member)
#SBATCH --output=$member_log
$(join(slurm_directives, "\n"))
$slurm_directives
$module_load_str
srun --output=$member_log --open-mode=append julia --project=$experiment_dir -e '
Expand Down Expand Up @@ -83,8 +86,16 @@ function sbatch_model_run(
:ntasks => 1,
:cpus_per_task => 1,
),
kwargs...,
)
# Type and existence checks
@assert isdir(output_dir) "Output directory does not exist: $output_dir"
@assert isdir(experiment_dir) "Experiment directory does not exist: $experiment_dir"
@assert isfile(model_interface) "Model interface file does not exist: $model_interface"

Check warning on line 93 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L91-L93

Added lines #L91 - L93 were not covered by tests

# Range checks
@assert iter >= 0 "Iteration number must be non-negative"
@assert member > 0 "Member number must be positive"

Check warning on line 97 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L96-L97

Added lines #L96 - L97 were not covered by tests

sbatch_contents = generate_sbatch_script(
iter,
member,
Expand All @@ -93,28 +104,29 @@ function sbatch_model_run(
model_interface,
module_load_str;
slurm_kwargs,
kwargs...,
)

sbatch_filepath, io = mktemp(output_dir)
write(io, sbatch_contents)
close(io)

return submit_sbatch_job(sbatch_filepath)
jobid = mktemp(output_dir) do sbatch_filepath, io
write(io, sbatch_contents)
close(io)
submit_sbatch_job(sbatch_filepath)

Check warning on line 112 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L109-L112

Added lines #L109 - L112 were not covered by tests
end
return jobid

Check warning on line 114 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L114

Added line #L114 was not covered by tests
end

function wait_for_jobs(
jobids,
jobids::Vector{Int},
output_dir,
iter,
experiment_dir,
model_interface,
module_load_str;
verbose,
slurm_kwargs,
reruns = 1,
)
statuses = map(job_status, jobids)
rerun_jobs = Set{Int}()
rerun_job_count = zeros(length(jobids))

Check warning on line 129 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L129

Added line #L129 was not covered by tests
completed_jobs = Set{Int}()

try
Expand All @@ -124,7 +136,7 @@ function wait_for_jobs(

if job_failed(status)
log_member_error(output_dir, iter, m, verbose)
if !(m in rerun_jobs)
if rerun_job_count[m] < reruns

Check warning on line 139 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L139

Added line #L139 was not covered by tests

@info "Rerunning ensemble member $m"
jobids[m] = sbatch_model_run(
Expand All @@ -136,7 +148,7 @@ function wait_for_jobs(
module_load_str;
slurm_kwargs,
)
push!(rerun_jobs, m)
rerun_job_count[m] += 1

Check warning on line 151 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L151

Added line #L151 was not covered by tests
else
push!(completed_jobs, m)
end
Expand All @@ -162,7 +174,7 @@ end
"""
log_member_error(output_dir, iteration, member, verbose = false)
Log a warning message when an error occurs in a specific ensemble member during a model run in a Slurm environment.
Log a warning message when an error occurs.
If verbose, includes the ensemble member's output.
"""
function log_member_error(output_dir, iteration, member, verbose = false)
Expand All @@ -189,14 +201,14 @@ function report_iteration_status(statuses, output_dir, iter)
end
end

function submit_sbatch_job(sbatch_filepath; debug = false, env = deepcopy(ENV))
function submit_sbatch_job(sbatch_filepath; env = deepcopy(ENV))

Check warning on line 204 in src/slurm.jl

View check run for this annotation

Codecov / codecov/patch

src/slurm.jl#L204

Added line #L204 was not covered by tests
# Ensure that we don't inherit unwanted environment variables
unset_env_vars =
("SLURM_MEM_PER_CPU", "SLURM_MEM_PER_GPU", "SLURM_MEM_PER_NODE")
for k in unset_env_vars
haskey(env, k) && delete!(env, k)
end
jobid = readchomp(setenv(`sbatch --parsable $sbatch_filepath`, env))
debug || rm(sbatch_filepath)
return parse(Int, jobid)
end

Expand Down
1 change: 1 addition & 0 deletions test/slurm_unit_tests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ sleep(1)

# Test batch cancellation
jobids = ntuple(x -> submit_cmd_helper(test_cmd), 5)

CAL.kill_all_jobs(jobids)
for jobid in jobids
@test CAL.job_completed(CAL.job_status(jobid))
Expand Down

0 comments on commit 6330a0b

Please sign in to comment.