From db5cb816dda07e10a1ae266b7961f078ea9da183 Mon Sep 17 00:00:00 2001 From: nefrathenrici Date: Thu, 11 Jul 2024 17:22:12 -0600 Subject: [PATCH] Add PBS controller, Derecho backend --- .buildkite/clima_server_test/pipeline.yml | 2 +- .buildkite/pipeline.yml | 2 +- Project.toml | 2 +- README.md | 16 +- docs/make.jl | 1 - docs/src/api.md | 19 +- docs/src/index.md | 5 +- src/ClimaCalibrate.jl | 1 + src/backends.jl | 148 +++++--- src/ekp_interface.jl | 6 +- src/pbs.jl | 210 +++++++++++ src/slurm.jl | 347 ++++++++++-------- test/ekp_interface.jl | 10 +- ...lurm_backend_e2e.jl => hpc_backend_e2e.jl} | 33 +- test/pbs_unit_tests.jl | 122 ++++++ test/slurm_unit_tests.jl | 16 +- 16 files changed, 699 insertions(+), 241 deletions(-) create mode 100644 src/pbs.jl rename test/{slurm_backend_e2e.jl => hpc_backend_e2e.jl} (70%) create mode 100644 test/pbs_unit_tests.jl diff --git a/.buildkite/clima_server_test/pipeline.yml b/.buildkite/clima_server_test/pipeline.yml index d711c824..90c24eb1 100644 --- a/.buildkite/clima_server_test/pipeline.yml +++ b/.buildkite/clima_server_test/pipeline.yml @@ -21,7 +21,7 @@ steps: - wait - label: "SurfaceFluxes perfect model calibration" - command: julia --project=experiments/surface_fluxes_perfect_model test/slurm_backend_e2e.jl + command: julia --project=experiments/surface_fluxes_perfect_model test/hpc_backend_e2e.jl artifact_paths: output/surface_fluxes_perfect_model/* - label: "Slurm job controller unit tests" diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index e5b54a93..085008e6 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -17,7 +17,7 @@ steps: - wait - label: "SurfaceFluxes perfect model calibration" - command: julia --project=experiments/surface_fluxes_perfect_model test/slurm_backend_e2e.jl + command: julia --project=experiments/surface_fluxes_perfect_model test/hpc_backend_e2e.jl artifact_paths: output/surface_fluxes_perfect_model/* - label: "Slurm job controller unit tests" diff --git a/Project.toml b/Project.toml index b87717fa..12eae9ba 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ClimaCalibrate" uuid = "4347a170-ebd6-470c-89d3-5c705c0cacc2" authors = ["Climate Modeling Alliance"] -version = "0.0.1" +version = "0.0.2" [deps] Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" diff --git a/README.md b/README.md index 4e4cf070..cf8e4f69 100644 --- a/README.md +++ b/README.md @@ -9,13 +9,8 @@ calibration pipelines using with minimal boilerplate.

-[![docsbuild][docs-bld-img]][docs-bld-url] [![dev][docs-dev-img]][docs-dev-url] [![ghaci][gha-ci-img]][gha-ci-url] -[![codecov][codecov-img]][codecov-url] - -[docs-bld-img]: https://github.com/CliMA/ClimaCalibrate.jl/workflows/Documentation/badge.svg -[docs-bld-url]: https://github.com/CliMA/ClimaCalibrate.jl/actions?query=workflow%3ADocumentation [docs-dev-img]: https://img.shields.io/badge/docs-dev-blue.svg [docs-dev-url]: https://CliMA.github.io/ClimaCalibrate.jl/dev/ @@ -23,13 +18,12 @@ calibration pipelines using with minimal boilerplate. [gha-ci-img]: https://github.com/CliMA/ClimaCalibrate.jl/actions/workflows/ci.yml/badge.svg [gha-ci-url]: https://github.com/CliMA/ClimaCalibrate.jl/actions/workflows/ci.yml -[codecov-img]: https://codecov.io/gh/CliMA/ClimaCalibrate.jl/branch/main/graph/badge.svg -[codecov-url]: https://codecov.io/gh/CliMA/ClimaCalibrate.jl - -The recommended Julia version is: Stable release v1.10.0 +The recommended Julia version is: Stable release v1.10.4 -This pipeline currently runs on the Resnick High Performance Computing Center. -We strive to support flexible and clearly documented calibration experiments. +Currently supported backends: +- [Resnick High Performance Computing Center](https://www.hpc.caltech.edu/) +- [NSF NCAR Supercomputer Derecho](https://ncar-hpc-docs.readthedocs.io/en/latest/compute-systems/derecho/) +- CliMA's private GPU server ## Contributing diff --git a/docs/make.jl b/docs/make.jl index 985a37e6..533d734f 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -26,7 +26,6 @@ makedocs( "Getting Started" => "quickstart.md", "ClimaAtmos Setup Guide" => "atmos_setup_guide.md", "Emulate and Sample" => "emulate_sample.md", - "Precompilation" => "precompilation.md", "API" => "api.md", ], ) diff --git a/docs/src/api.md b/docs/src/api.md index aba12d60..a55eb4bd 100644 --- a/docs/src/api.md +++ b/docs/src/api.md @@ -13,7 +13,24 @@ ClimaCalibrate.observation_map ```@docs ClimaCalibrate.get_backend ClimaCalibrate.calibrate -ClimaCalibrate.sbatch_model_run +ClimaCalibrate.model_run +ClimaCalibrate.module_load_string +``` + +## Job Scheduler +```@docs +ClimaCalibrate.wait_for_jobs +ClimaCalibrate.log_member_error +ClimaCalibrate.kill_job +ClimaCalibrate.job_status +ClimaCalibrate.kwargs +ClimaCalibrate.slurm_model_run +ClimaCalibrate.generate_sbatch_script +ClimaCalibrate.generate_sbatch_directives +ClimaCalibrate.submit_slurm_job +ClimaCalibrate.pbs_model_run +ClimaCalibrate.generate_pbs_script +ClimaCalibrate.submit_pbs_job ``` ## EnsembleKalmanProcesses Interface diff --git a/docs/src/index.md b/docs/src/index.md index c8931f77..f6d7248a 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -3,8 +3,7 @@ ClimaCalibrate.jl is a toolkit for developing scalable and reproducible model calibration pipelines using CalibrateEmulateSample.jl with minimal boilerplate. -To use this framework, component models (and the coupler) define their own versions of the functions provided in the interface (`get_config`, `get_forward_model`, and `run_forward_model`). - -Calibrations can either be run using pure Julia, the Caltech central cluster, or CliMA's GPU server. +To use this framework, component models (and the coupler) define their own versions of the functions provided in the interface. +Calibrations can either be run using just Julia, the Caltech central cluster, NCAR Derecho, or CliMA's GPU server. For more information, see our Getting Started page. diff --git a/src/ClimaCalibrate.jl b/src/ClimaCalibrate.jl index 2a78affa..f4cfae51 100644 --- a/src/ClimaCalibrate.jl +++ b/src/ClimaCalibrate.jl @@ -3,6 +3,7 @@ module ClimaCalibrate include("ekp_interface.jl") include("model_interface.jl") include("slurm.jl") +include("pbs.jl") include("backends.jl") include("emulate_sample.jl") diff --git a/src/backends.jl b/src/backends.jl index a450d2a1..b544d08f 100644 --- a/src/backends.jl +++ b/src/backends.jl @@ -1,12 +1,17 @@ -export get_backend, calibrate +export get_backend, calibrate, model_run abstract type AbstractBackend end struct JuliaBackend <: AbstractBackend end -abstract type SlurmBackend <: AbstractBackend end + +abstract type HPCBackend <: AbstractBackend end +abstract type SlurmBackend <: HPCBackend end + struct CaltechHPCBackend <: SlurmBackend end struct ClimaGPUBackend <: SlurmBackend end +struct DerechoBackend <: HPCBackend end + """ get_backend() @@ -18,6 +23,8 @@ function get_backend() (r"^clima.gps.caltech.edu$", ClimaGPUBackend), (r"^login[1-4].cm.cluster$", CaltechHPCBackend), (r"^hpc-(\d\d)-(\d\d).cm.cluster$", CaltechHPCBackend), + (r"derecho([1-8])$", DerechoBackend), + (r"dec(\d\d\d\d)$", DerechoBackend), # This should be more specific ] for (pattern, backend) in HOSTNAMES @@ -28,12 +35,12 @@ function get_backend() end """ - module_load_string(T) where {T<:Type{SlurmBackend}} + module_load_string(backend) Return a string that loads the correct modules for a given backend when executed via bash. """ function module_load_string(::Type{CaltechHPCBackend}) - return """export MODULEPATH=/groups/esm/modules:\$MODULEPATH + return """export MODULEPATH="/groups/esm/modules:\$MODULEPATH" module purge module load climacommon/2024_05_27""" end @@ -43,32 +50,14 @@ function module_load_string(::Type{ClimaGPUBackend}) module load julia/1.10.0 cuda/julia-pref openmpi/4.1.5-mpitrampoline""" end -""" - calibrate(::Type{JuliaBackend}, config::ExperimentConfig) - calibrate(::Type{JuliaBackend}, experiment_dir::AbstractString) - -Run a calibration in Julia. - -Takes an ExperimentConfig or an experiment folder. -If no backend is passed, one is chosen via `get_backend`. -This function is intended for use in a larger workflow, assuming that all needed -model interface and observation map functions are set up for the calibration. - -# Example -Run: `julia --project=experiments/surface_fluxes_perfect_model` -```julia -import ClimaCalibrate - -# Generate observational data and load interface -experiment_dir = dirname(Base.active_project()) -include(joinpath(experiment_dir, "generate_data.jl")) -include(joinpath(experiment_dir, "observation_map.jl")) -include(joinpath(experiment_dir, "model_interface.jl")) +function module_load_string(::Type{DerechoBackend}) + return """export MODULEPATH="/glade/campaign/univ/ucit0011/ClimaModules-Derecho:\$MODULEPATH" + module purge + module load climacommon + module list + """ +end -# Initialize and run the calibration -eki = ClimaCalibrate.calibrate(experiment_dir) -``` -""" calibrate(config::ExperimentConfig; ekp_kwargs...) = calibrate(get_backend(), config; ekp_kwargs...) @@ -86,9 +75,8 @@ function calibrate( config::ExperimentConfig; ekp_kwargs..., ) - initialize(config; ekp_kwargs...) (; n_iterations, ensemble_size) = config - eki = nothing + eki = initialize(config; ekp_kwargs...) for i in 0:(n_iterations - 1) @info "Running iteration $i" for m in 1:ensemble_size @@ -103,25 +91,28 @@ function calibrate( end """ - calibrate(::Type{SlurmBackend}, config::ExperimentConfig; kwargs...) - calibrate(::Type{SlurmBackend}, experiment_dir; kwargs...) + calibrate(::Type{AbstractBackend}, config::ExperimentConfig; kwargs...) + calibrate(::Type{AbstractBackend}, experiment_dir; kwargs...) Run a full calibration, scheduling the forward model runs on Caltech's HPC cluster. Takes either an ExperimentConfig or an experiment folder. +Available Backends: CaltechHPCBackend, ClimaGPUBackend, DerechoBackend, JuliaBackend + + # Keyword Arguments - `experiment_dir: Directory containing experiment configurations. - `model_interface: Path to the model interface file. -- `slurm_kwargs`: Dictionary of slurm arguments, passed through to `sbatch`. -- `verbose::Bool`: Enable verbose output for debugging. +- `hpc_kwargs`: Dictionary of resource arguments, passed to the job scheduler. +- `verbose::Bool`: Enable verbose logging. # Usage Open julia: `julia --project=experiments/surface_fluxes_perfect_model` ```julia -import ClimaCalibrate: CaltechHPCBackend, calibrate +using ClimaCalibrate -experiment_dir = dirname(Base.active_project()) +experiment_dir = joinpath(pkgdir(ClimaCalibrate), "experiments", "surface_fluxes_perfect_model") model_interface = joinpath(experiment_dir, "model_interface.jl") # Generate observational data and load interface @@ -129,49 +120,51 @@ include(joinpath(experiment_dir, "generate_data.jl")) include(joinpath(experiment_dir, "observation_map.jl")) include(model_interface) -slurm_kwargs = kwargs(time = 3) -eki = calibrate(CaltechHPCBackend, experiment_dir; model_interface, slurm_kwargs); +hpc_kwargs = kwargs(time = 3) +backend = get_backend() +eki = calibrate(backend, experiment_dir; model_interface, hpc_kwargs); ``` """ function calibrate( - b::Type{<:SlurmBackend}, + b::Type{<:HPCBackend}, experiment_dir::AbstractString; - slurm_kwargs, + hpc_kwargs, ekp_kwargs..., ) - calibrate(b, ExperimentConfig(experiment_dir); slurm_kwargs, ekp_kwargs...) + calibrate(b, ExperimentConfig(experiment_dir); hpc_kwargs, ekp_kwargs...) end function calibrate( - b::Type{<:SlurmBackend}, + b::Type{<:HPCBackend}, config::ExperimentConfig; experiment_dir = dirname(Base.active_project()), model_interface = abspath( joinpath(experiment_dir, "..", "..", "model_interface.jl"), ), verbose = false, - slurm_kwargs = Dict(:time_limit => 45, :ntasks => 1), + reruns = 1, + hpc_kwargs, ekp_kwargs..., ) # ExperimentConfig is created from a YAML file within the experiment_dir (; n_iterations, output_dir, ensemble_size) = config @info "Initializing calibration" n_iterations ensemble_size output_dir - initialize(config; ekp_kwargs...) - eki = nothing + eki = initialize(config; ekp_kwargs...) module_load_str = module_load_string(b) for iter in 0:(n_iterations - 1) @info "Iteration $iter" jobids = map(1:ensemble_size) do member @info "Running ensemble member $member" - sbatch_model_run( + model_run( + b, iter, member, output_dir, experiment_dir, model_interface, module_load_str; - slurm_kwargs, + hpc_kwargs, ) end @@ -182,10 +175,10 @@ function calibrate( experiment_dir, model_interface, module_load_str; - slurm_kwargs, + hpc_kwargs, verbose, + reruns, ) - report_iteration_status(statuses, output_dir, iter) @info "Completed iteration $iter, updating ensemble" G_ensemble = observation_map(iter) save_G_ensemble(config, iter, G_ensemble) @@ -193,3 +186,58 @@ function calibrate( end return eki end + +# Dispatch on backend type to unify `calibrate` for all HPCBackends +# Scheduler interfaces should not depend on backend struct +""" + model_run(backend, iter, member, output_dir, experiment_dir; model_interface, verbose, hpc_kwargs) + +Construct and execute a command to run a single forward model on a given job scheduler. + +Dispatches on `backend` to run [`slurm_model_run`](@ref) or [`pbs_model_run`](@ref). + +Arguments: +- iter: Iteration number +- member: Member number +- output_dir: Calibration experiment output directory +- experiment_dir: Directory containing the experiment's Project.toml +- model_interface: File containing the model interface +- module_load_str: Commands which load the necessary modules +- hpc_kwargs: Dictionary containing the resources for the job. Easily generated using [`kwargs`](@ref). +""" +model_run( + b::Type{<:SlurmBackend}, + iter, + member, + output_dir, + experiment_dir, + model_interface, + module_load_str; + hpc_kwargs, +) = slurm_model_run( + iter, + member, + output_dir, + experiment_dir, + model_interface, + module_load_str; + hpc_kwargs, +) +model_run( + b::Type{DerechoBackend}, + iter, + member, + output_dir, + experiment_dir, + model_interface, + module_load_str; + hpc_kwargs, +) = pbs_model_run( + iter, + member, + output_dir, + experiment_dir, + model_interface, + module_load_str; + hpc_kwargs, +) diff --git a/src/ekp_interface.jl b/src/ekp_interface.jl index 7b778d84..27fb86b5 100644 --- a/src/ekp_interface.jl +++ b/src/ekp_interface.jl @@ -171,10 +171,10 @@ function env_model_interface(env = ENV) return string(env[key]) end -function env_iter_number(env = ENV) - key = "CALIBRATION_ITER_NUMBER" +function env_iteration(env = ENV) + key = "CALIBRATION_ITERATION" haskey(env, key) || error( - "Iteration number not found in environment. Ensure that env variable \"CALIBRATION_ITER_NUMBER\" is set.", + "Iteration number not found in environment. Ensure that env variable \"CALIBRATION_ITERATION\" is set.", ) return parse(Int, env[key]) end diff --git a/src/pbs.jl b/src/pbs.jl new file mode 100644 index 00000000..261cebff --- /dev/null +++ b/src/pbs.jl @@ -0,0 +1,210 @@ +""" +generate_pbs_script( + iter, member, + output_dir, experiment_dir, model_interface; + module_load_str, hpc_kwargs, + ) + +Generate a string containing a PBS script to run the forward model. + +Returns: +- `qsub_contents::Function`: A function generating the content of the PBS script based on the provided arguments. + This will run the contents of the `julia_script`, which have to be run from a file due to Derecho's `set_gpu_rank`. +- `julia_script::String`: The Julia script string to be executed by the PBS job. + +Helper function for [`pbs_model_run`](@ref). +""" +function generate_pbs_script( + iter::Int, + member::Int, + output_dir::AbstractString, + experiment_dir::AbstractString, + model_interface::AbstractString, + module_load_str::AbstractString; + hpc_kwargs = Dict(), +) + member_log = path_to_model_log(output_dir, iter, member) + + queue = get(hpc_kwargs, :queue, "main") + walltime = format_pbs_time(get(hpc_kwargs, :time, 45)) + num_nodes = get(hpc_kwargs, :ntasks, 1) + cpus_per_node = get(hpc_kwargs, :cpus_per_task, 1) + gpus_per_node = get(hpc_kwargs, :gpus_per_task, 0) + + if queue != "develop" && cpus_per_node != 128 && cpus_per_node != 0 + @warn "Most Derecho nodes are exclusive and charge for the full node" + end + + if gpus_per_node > 0 + ranks_per_node = gpus_per_node + set_gpu_rank = "set_gpu_rank" + climacomms_device = "CUDA" + else + ranks_per_node = cpus_per_node + set_gpu_rank = "" + climacomms_device = "CPU" + end + total_ranks = num_nodes * ranks_per_node + + # This must match the filepath in `pbs_model_run` + member_path = path_to_ensemble_member(output_dir, iter, member) + julia_filepath = joinpath(member_path, "model_run.jl") + + pbs_script = """ +#!/bin/bash +#PBS -N run_$(iter)_$member +#PBS -j oe +#PBS -A UCIT0011 +#PBS -q $queue +#PBS -o $member_log +#PBS -l walltime=$walltime +#PBS -l select=$num_nodes:ncpus=$cpus_per_node:ngpus=$gpus_per_node:mpiprocs=$ranks_per_node + +$module_load_str + +export JULIA_MPI_HAS_CUDA=true +export CLIMACOMMS_DEVICE="$climacomms_device" +export CLIMACOMMS_CONTEXT="MPI" +\$MPITRAMPOLINE_MPIEXEC -n $total_ranks -ppn $ranks_per_node $set_gpu_rank julia --project=$experiment_dir $julia_filepath +""" + + julia_script = """\ + import ClimaCalibrate as CAL + include("$(abspath(model_interface))") + CAL.run_forward_model(CAL.set_up_forward_model($member, $iter, "$experiment_dir")) + """ + return pbs_script, julia_script +end + +""" + pbs_model_run(iter, member, output_dir, experiment_dir, model_interface, module_load_str; hpc_kwargs) + +Construct and execute a command to run a single forward model on PBS Pro. +Helper function for [`model_run`](@ref). +""" +function pbs_model_run( + iter, + member, + output_dir, + experiment_dir, + model_interface, + module_load_str; + hpc_kwargs, + debug = false, +) + # Type and existence checks + @assert isdir(output_dir) "Output directory does not exist: $output_dir" + @assert isdir(experiment_dir) "Experiment directory does not exist: $experiment_dir" + @assert isfile(model_interface) "Model interface file does not exist: $model_interface" + + # Range checks + @assert iter >= 0 "Iteration number must be non-negative" + @assert member > 0 "Member number must be positive" + + pbs_script_contents, julia_script_contents = generate_pbs_script( + iter, + member, + output_dir, + experiment_dir, + model_interface, + module_load_str; + hpc_kwargs, + ) + # TODO: Improve and figure out how to clean up the model_run.jl file + # julia_filepath does not get cleaned up. `mktemp() do` syntax is ideal + # but would remove the script file before the PBS script actually executes + member_path = path_to_ensemble_member(output_dir, iter, member) + julia_filepath = joinpath(member_path, "model_run.jl") + write(julia_filepath, julia_script_contents) + jobid = mktemp(output_dir) do pbs_filepath, io + write(io, pbs_script_contents) + close(io) + submit_pbs_job(pbs_filepath) + end + if debug + println("PBS Script:") + println(pbs_script_contents(julia_filepath)) + println("Julia Script:") + println(julia_script_contents) + end + return jobid +end + +""" + submit_pbs_job(sbatch_filepath; env=deepcopy(ENV)) + +Submit a job to the PBS Pro scheduler using qsub, removing unwanted environment variables. + +Unset variables: "PBS_MEM_PER_CPU", "PBS_MEM_PER_GPU", "PBS_MEM_PER_NODE" +""" +function submit_pbs_job(filepath; debug = false, env = deepcopy(ENV)) + unset_env_vars = ("PBS_MEM_PER_CPU", "PBS_MEM_PER_GPU", "PBS_MEM_PER_NODE") + for k in unset_env_vars + haskey(env, k) && delete!(env, k) + end + jobid = readchomp(setenv(`qsub $filepath`, env)) + return jobid +end + +# Type alias for dispatching on PBSJobID (String) vs SlurmJobID (Int) +const PBSJobID = AbstractString + +wait_for_jobs( + jobids::AbstractVector{<:PBSJobID}, + output_dir, + iter, + experiment_dir, + model_interface, + module_load_str; + verbose, + hpc_kwargs, + reruns = 1, +) = wait_for_jobs( + jobids, + output_dir, + iter, + experiment_dir, + model_interface, + module_load_str, + pbs_model_run; + verbose, + hpc_kwargs, + reruns, +) + +function job_status(jobid::PBSJobID) + status_str = readchomp(`qstat -f $jobid -x -F dsv`) + job_state_match = match(r"job_state=([^|]+)", status_str) + status = first(job_state_match.captures) + substate_match = match(r"substate=([^|]+)", status_str) + substate_number = parse(Int, (first(substate_match.captures))) + status_dict = Dict("Q" => :RUNNING, "F" => :COMPLETED) + status_symbol = get(status_dict, status, :RUNNING) + # Check for failure in the substate number + if status_symbol == :COMPLETED && substate_number in (91, 93) + status_symbol = :FAILED + end + return status_symbol +end + +function kill_job(jobid::PBSJobID) + try + run(`qdel $jobid`) + println("Cancelling PBS job $jobid") + catch e + println("Failed to cancel PBS job $jobid: ", e) + end +end + +"Format `minutes` to a PBS time string (HH:MM:SS)" +function format_pbs_time(minutes::Int) + hours, remaining_minutes = divrem(minutes, 60) + return string( + lpad(hours, 2, '0'), + ":", + lpad(remaining_minutes, 2, '0'), + ":00", + ) +end + +format_pbs_time(str::AbstractString) = str diff --git a/src/slurm.jl b/src/slurm.jl index dd057c60..1a050bfc 100644 --- a/src/slurm.jl +++ b/src/slurm.jl @@ -1,26 +1,176 @@ -export kwargs, sbatch_model_run, wait_for_jobs +export kwargs, slurm_model_run, wait_for_jobs +# Initial code is common to PBS and Slurm schedulers + +""" + kwargs(; kwargs...) + +Create a dictionary from keyword arguments. +""" kwargs(; kwargs...) = Dict{Symbol, Any}(kwargs...) -function generate_sbatch_directives(slurm_kwargs) - @assert haskey(slurm_kwargs, :time) "Slurm kwargs must include key :time" +""" + wait_for_jobs(jobids, output_dir, iter, experiment_dir, model_interface, module_load_str, model_run_func; verbose, hpc_kwargs, reruns=1) + +Wait for a set of jobs to complete. If a job fails, it will be rerun up to `reruns` times. + +This function monitors the status of multiple jobs and handles failures by rerunning the failed jobs up to the specified number of `reruns`. It logs errors and job completion status, ensuring all jobs are completed before proceeding. + +Arguments: +- `jobids`: Vector of job IDs. +- `output_dir`: Directory for output. +- `iter`: Iteration number. +- `experiment_dir`: Directory for the experiment. +- `model_interface`: Interface to the model. +- `module_load_str`: Commands to load necessary modules. +- `model_run_func`: Function to run the model. +- `verbose`: Print detailed logs if true. +- `hpc_kwargs`: HPC job parameters. +- `reruns`: Number of times to rerun failed jobs. +""" +function wait_for_jobs( + jobids::AbstractVector, + output_dir, + iter, + experiment_dir, + model_interface, + module_load_str, + model_run_func; + verbose, + hpc_kwargs, + reruns = 1, +) + rerun_job_count = zeros(length(jobids)) + completed_jobs = Set{Int}() + + try + while length(completed_jobs) < length(jobids) + for (m, jobid) in enumerate(jobids) + m in completed_jobs && continue + + if job_failed(jobid) + log_member_error(output_dir, iter, m, verbose) + if rerun_job_count[m] < reruns + + @info "Rerunning ensemble member $m" + jobids[m] = model_run_func( + iter, + m, + output_dir, + experiment_dir, + model_interface, + module_load_str; + hpc_kwargs, + ) + rerun_job_count[m] += 1 + else + push!(completed_jobs, m) + end + elseif job_success(jobid) + @info "Ensemble member $m complete" + push!(completed_jobs, m) + end + end + sleep(10) + end + catch e + kill_job.(jobids) + if !(e isa InterruptException) + @error "Pipeline crashed outside of a model run. Stacktrace:" exception = + (e, catch_backtrace()) + end + end + + report_iteration_status(jobids, output_dir, iter) + return map(job_status, jobids) +end + +""" + log_member_error(output_dir, iteration, member, verbose=false) - slurm_kwargs[:time] = format_slurm_time(slurm_kwargs[:time]) - slurm_directives = map(collect(slurm_kwargs)) do (k, v) +Log a warning message when an error occurs. If verbose, includes the ensemble member's output. +""" +function log_member_error(output_dir, iteration, member, verbose = false) + member_log = path_to_model_log(output_dir, iteration, member) + warn_str = """Ensemble member $member raised an error. See model log at \ + $(abspath(member_log)) for stacktrace""" + if verbose + stacktrace = replace(readchomp(member_log), "\\n" => "\n") + warn_str = warn_str * ": \n$stacktrace" + end + @warn warn_str +end + +job_running(status::Symbol) = status == :RUNNING +job_success(status::Symbol) = status == :COMPLETED +job_failed(status::Symbol) = status == :FAILED +job_completed(status::Symbol) = job_failed(status) || job_success(status) + +job_running(jobid) = job_running(job_status(jobid)) +job_success(jobid) = job_success(job_status(jobid)) +job_failed(jobid) = job_failed(job_status(jobid)) +job_completed(jobid) = job_completed(job_status(jobid)) + +""" + report_iteration_status(jobids, output_dir, iter) + +Report the status of an iteration. See also [`wait_for_jobs`](@ref). +""" +function report_iteration_status(jobids, output_dir, iter) + if !all(job_completed.(jobids)) + error("Some jobs are not complete: $(filter(job_completed, jobids))") + elseif all(job_failed, jobids) + error( + """Full ensemble for iteration $iter has failed. See model logs in +$(abspath(path_to_iteration(output_dir, iter)))""", + ) + elseif any(job_failed, jobids) + @warn "Failed ensemble members: $(findall(job_failed, jobids))" + end +end + +# Slurm-specific functions + +""" + submit_slurm_job(sbatch_filepath; env=deepcopy(ENV)) + +Submit a job to the Slurm scheduler using sbatch, removing unwanted environment variables. + +Unset variables: "SLURM_MEM_PER_CPU", "SLURM_MEM_PER_GPU", "SLURM_MEM_PER_NODE" +""" +function submit_slurm_job(sbatch_filepath; env = deepcopy(ENV)) + # Ensure that we don't inherit unwanted environment variables + unset_env_vars = + ("SLURM_MEM_PER_CPU", "SLURM_MEM_PER_GPU", "SLURM_MEM_PER_NODE") + for k in unset_env_vars + haskey(env, k) && delete!(env, k) + end + jobid = readchomp(setenv(`sbatch --parsable $sbatch_filepath`, env)) + return parse(Int, jobid) +end + +""" + generate_sbatch_directives(hpc_kwargs) + +Generate Slurm sbatch directives from HPC kwargs. +""" +function generate_sbatch_directives(hpc_kwargs) + @assert haskey(hpc_kwargs, :time) "Slurm kwargs must include key :time" + + hpc_kwargs[:time] = format_slurm_time(hpc_kwargs[:time]) + slurm_directives = map(collect(hpc_kwargs)) do (k, v) "#SBATCH --$(replace(string(k), "_" => "-"))=$(replace(string(v), "_" => "-"))" end return join(slurm_directives, "\n") end """ - generate_sbatch_script(iter, member, - output_dir, experiment_dir, model_interface; - module_load_str, slurm_kwargs, - ) + generate_sbatch_script(iter, member, output_dir, experiment_dir, model_interface; module_load_str, hpc_kwargs) Generate a string containing an sbatch script to run the forward model. -Helper function for `sbatch_model_run`. +`hpc_kwargs` is turned into a series of sbatch directives using [`generate_sbatch_directives`](@ref). +`module_load_str` is used to load the necessary modules and can be obtained via [`module_load_string`](@ref). """ function generate_sbatch_script( iter::Int, @@ -29,19 +179,23 @@ function generate_sbatch_script( experiment_dir::AbstractString, model_interface::AbstractString, module_load_str::AbstractString; - slurm_kwargs, + hpc_kwargs, ) member_log = path_to_model_log(output_dir, iter, member) - slurm_directives = generate_sbatch_directives(slurm_kwargs) + slurm_directives = generate_sbatch_directives(hpc_kwargs) + gpus_per_task = get(hpc_kwargs, :gpus_per_task, 0) + climacomms_device = gpus_per_task > 0 ? "CUDA" : "CPU" sbatch_contents = """ #!/bin/bash #SBATCH --job-name=run_$(iter)_$(member) #SBATCH --output=$member_log $slurm_directives - set -euo pipefail $module_load_str + export CLIMACOMMS_DEVICE="$climacomms_device" + export CLIMACOMMS_CONTEXT="MPI" + srun --output=$member_log --open-mode=append julia --project=$experiment_dir -e ' import ClimaCalibrate as CAL iteration = $iter; member = $member @@ -55,35 +209,19 @@ function generate_sbatch_script( end """ - sbatch_model_run( - iter, - member, - output_dir, - experiment_dir; - model_interface, - verbose; - slurm_kwargs, - ) + slurm_model_run(iter, member, output_dir, experiment_dir, model_interface, module_load_str; hpc_kwargs) -Construct and execute a command to run a forward model on a Slurm cluster for a single ensemble member. - -Arguments: -- iter: Iteration number -- member: Member number -- output_dir: Calibration experiment output directory -- experiment_dir: Directory containing the experiment's Project.toml -- model_interface: File containing the model interface -- module_load_str: Commands which load the necessary modules -- slurm_kwargs: Dictionary containing the slurm resources for the job. Easily generated using `kwargs`. +Construct and execute a command to run a single forward model on Slurm. +Helper function for [`model_run`](@ref). """ -function sbatch_model_run( +function slurm_model_run( iter, member, output_dir, experiment_dir, model_interface, module_load_str; - slurm_kwargs = Dict{Symbol, Any}( + hpc_kwargs = Dict{Symbol, Any}( :time => 45, :ntasks => 1, :cpus_per_task => 1, @@ -105,126 +243,49 @@ function sbatch_model_run( experiment_dir, model_interface, module_load_str; - slurm_kwargs, + hpc_kwargs, ) jobid = mktemp(output_dir) do sbatch_filepath, io write(io, sbatch_contents) close(io) - submit_sbatch_job(sbatch_filepath) + submit_slurm_job(sbatch_filepath) end return jobid end -function wait_for_jobs( - jobids::Vector{Int}, +# Type alias for dispatching on PBSJobID (String) vs SlurmJobID (Int) +const SlurmJobID = Int + +wait_for_jobs( + jobids::AbstractVector{SlurmJobID}, output_dir, iter, experiment_dir, model_interface, module_load_str; verbose, - slurm_kwargs, + hpc_kwargs, reruns = 1, +) = wait_for_jobs( + jobids, + output_dir, + iter, + experiment_dir, + model_interface, + module_load_str, + slurm_model_run; + verbose = verbose, + hpc_kwargs, + reruns = reruns, ) - statuses = map(job_status, jobids) - rerun_job_count = zeros(length(jobids)) - completed_jobs = Set{Int}() - - try - while length(completed_jobs) < length(statuses) - for (m, status) in enumerate(statuses) - m in completed_jobs && continue - - if job_failed(status) - log_member_error(output_dir, iter, m, verbose) - if rerun_job_count[m] < reruns - - @info "Rerunning ensemble member $m" - jobids[m] = sbatch_model_run( - iter, - m, - output_dir, - experiment_dir, - model_interface, - module_load_str; - slurm_kwargs, - ) - rerun_job_count[m] += 1 - else - push!(completed_jobs, m) - end - elseif job_success(status) - @info "Ensemble member $m complete" - push!(completed_jobs, m) - end - end - sleep(5) - statuses = map(job_status, jobids) - end - return statuses - catch e - kill_all_jobs(jobids) - if !(e isa InterruptException) - @error "Pipeline crashed outside of a model run. Stacktrace for failed simulation" exception = - (e, catch_backtrace()) - end - return map(job_status, jobids) - end -end - -""" - log_member_error(output_dir, iteration, member, verbose = false) - -Log a warning message when an error occurs. -If verbose, includes the ensemble member's output. -""" -function log_member_error(output_dir, iteration, member, verbose = false) - member_log = path_to_model_log(output_dir, iteration, member) - warn_str = """Ensemble member $member raised an error. See model log at \ - $(abspath(member_log)) for stacktrace""" - if verbose - stacktrace = replace(readchomp(member_log), "\\n" => "\n") - warn_str = warn_str * ": \n$stacktrace" - end - @warn warn_str -end - -function report_iteration_status(statuses, output_dir, iter) - all(job_completed.(statuses)) || error("Some jobs are not complete") - - if all(job_failed, statuses) - error( - """Full ensemble for iteration $iter has failed. See model logs in - $(abspath(path_to_iteration(output_dir, iter)))""", - ) - elseif any(job_failed, statuses) - @warn "Failed ensemble members: $(findall(job_failed, statuses))" - end -end - -function submit_sbatch_job(sbatch_filepath; env = deepcopy(ENV)) - # Ensure that we don't inherit unwanted environment variables - unset_env_vars = - ("SLURM_MEM_PER_CPU", "SLURM_MEM_PER_GPU", "SLURM_MEM_PER_NODE") - for k in unset_env_vars - haskey(env, k) && delete!(env, k) - end - jobid = readchomp(setenv(`sbatch --parsable $sbatch_filepath`, env)) - return parse(Int, jobid) -end - -job_running(status) = status == :RUNNING -job_success(status) = status == :COMPLETED -job_failed(status) = status == :FAILED -job_completed(status) = job_failed(status) || job_success(status) """ job_status(jobid) Parse the slurm jobid's state and return one of three status symbols: :COMPLETED, :FAILED, or :RUNNING. """ -function job_status(jobid::Int) +function job_status(jobid::SlurmJobID) failure_statuses = ("FAILED", "CANCELLED+", "CANCELLED") output = readchomp(`sacct -j $jobid --format=State --noheader`) # Jobs usually have multiple statuses @@ -239,27 +300,24 @@ function job_status(jobid::Int) end """ - kill_all_jobs(jobids) + kill_job(jobid::SlurmJobID) + kill_job(jobid::PBSJobID) -Takes a list of slurm job IDs and runs `scancel` on them. +End a running job, catching errors in case the job can not be ended. """ -function kill_all_jobs(jobids) - for jobid in jobids - try - kill_slurm_job(jobid) - println("Cancelling slurm job $jobid") - catch e - println("Failed to cancel slurm job $jobid: ", e) - end +function kill_job(jobid::SlurmJobID) + try + run(`scancel $jobid`) + println("Cancelling slurm job $jobid") + catch e + println("Failed to cancel slurm job $jobid: ", e) end end -kill_slurm_job(jobid) = run(`scancel $jobid`) - +"Format `minutes` to a Slurm time string (D-HH:MM or HH:MM)" function format_slurm_time(minutes::Int) days, remaining_minutes = divrem(minutes, (60 * 24)) hours, remaining_minutes = divrem(remaining_minutes, 60) - # Format the string according to Slurm's time format if days > 0 return string( days, @@ -278,4 +336,5 @@ function format_slurm_time(minutes::Int) ) end end + format_slurm_time(str::AbstractString) = str diff --git a/test/ekp_interface.jl b/test/ekp_interface.jl index 39fb0755..b90a1b93 100644 --- a/test/ekp_interface.jl +++ b/test/ekp_interface.jl @@ -61,8 +61,8 @@ end "Experiment dir not found in environment. Ensure that env variable \"CALIBRATION_EXPERIMENT_DIR\" is set.", ) CAL.env_experiment_dir() @test_throws ErrorException( - "Iteration number not found in environment. Ensure that env variable \"CALIBRATION_ITER_NUMBER\" is set.", - ) CAL.env_iter_number() + "Iteration number not found in environment. Ensure that env variable \"CALIBRATION_ITERATION\" is set.", + ) CAL.env_iteration() @test_throws ErrorException( "Member number not found in environment. Ensure that env variable \"CALIBRATION_MEMBER_NUMBER\" is set.", ) CAL.env_member_number() @@ -72,15 +72,15 @@ end test_ENV = Dict() test_ENV["CALIBRATION_EXPERIMENT_DIR"] = experiment_dir = "test" - test_ENV["CALIBRATION_ITER_NUMBER"] = "0" - iter_number = parse(Int, test_ENV["CALIBRATION_ITER_NUMBER"]) + test_ENV["CALIBRATION_ITERATION"] = "0" + iter_number = parse(Int, test_ENV["CALIBRATION_ITERATION"]) test_ENV["CALIBRATION_MEMBER_NUMBER"] = "1" member_number = parse(Int, test_ENV["CALIBRATION_MEMBER_NUMBER"]) test_ENV["CALIBRATION_MODEL_INTERFACE"] = model_interface = joinpath(pkgdir(CAL), "model_interface.jl") @test experiment_dir == CAL.env_experiment_dir(test_ENV) - @test iter_number == CAL.env_iter_number(test_ENV) + @test iter_number == CAL.env_iteration(test_ENV) @test member_number == CAL.env_member_number(test_ENV) @test model_interface == CAL.env_model_interface(test_ENV) end diff --git a/test/slurm_backend_e2e.jl b/test/hpc_backend_e2e.jl similarity index 70% rename from test/slurm_backend_e2e.jl rename to test/hpc_backend_e2e.jl index 3cef7c92..46e99442 100644 --- a/test/slurm_backend_e2e.jl +++ b/test/hpc_backend_e2e.jl @@ -1,8 +1,17 @@ -# Tests for SurfaceFluxes example calibration on slurm, used in buildkite testing -# To run, open the REPL: julia --project=experiments/surface_fluxes_perfect_model test/caltech -# And include this file - -import ClimaCalibrate: get_backend, JuliaBackend, calibrate, get_prior, kwargs +# Tests for SurfaceFluxes example calibration on HPC, used in buildkite testing +# To run, open the REPL: julia --project=experiments/surface_fluxes_perfect_model test/hpc_backend_e2e.jl + +using Pkg +Pkg.instantiate(; verbose = true) + +import ClimaCalibrate: + get_backend, + HPCBackend, + JuliaBackend, + calibrate, + get_prior, + kwargs, + DerechoBackend using Test import EnsembleKalmanProcesses: get_ϕ_mean_final, get_g_mean_final @@ -32,14 +41,12 @@ function test_sf_calibration_output(eki, prior) end end -@assert get_backend() != JuliaBackend - -eki = calibrate( - experiment_dir; - model_interface, - slurm_kwargs = kwargs(time = 5), - verbose = true, -) +@assert get_backend() <: HPCBackend +hpc_kwargs = kwargs(time = 5, ntasks = 1, cpus_per_task = 1) +if get_backend() == DerechoBackend + hpc_kwargs[:queue] = "develop" +end +eki = calibrate(experiment_dir; model_interface, hpc_kwargs, verbose = true) test_sf_calibration_output(eki, prior) # Pure Julia calibration, this should run anywhere diff --git a/test/pbs_unit_tests.jl b/test/pbs_unit_tests.jl new file mode 100644 index 00000000..c6d029ca --- /dev/null +++ b/test/pbs_unit_tests.jl @@ -0,0 +1,122 @@ +using Test +import ClimaCalibrate as CAL + +const OUTPUT_DIR = "test" +const ITER = 1 +const MEMBER = 1 +const TIME_LIMIT = 90 +const NTASKS = 2 +const CPUS_PER_TASK = 16 +const GPUS_PER_TASK = 2 +const EXPERIMENT_DIR = "exp/dir" +const MODEL_INTERFACE = "model_interface.jl" +const MODULE_LOAD_STR = CAL.module_load_string(CAL.DerechoBackend) +const hpc_kwargs = CAL.kwargs( + time = TIME_LIMIT, + ntasks = NTASKS, + cpus_per_task = CPUS_PER_TASK, + gpus_per_task = GPUS_PER_TASK, +) + +# Time formatting tests +@test CAL.format_pbs_time(TIME_LIMIT) == "01:30:00" +@test CAL.format_pbs_time(1) == "00:01:00" +@test CAL.format_pbs_time(60) == "01:00:00" +@test CAL.format_pbs_time(1440) == "24:00:00" +@test CAL.format_pbs_time(2880) == "48:00:00" + +# Generate and validate pbs file contents +pbs_file, julia_file = CAL.generate_pbs_script( + ITER, + MEMBER, + OUTPUT_DIR, + EXPERIMENT_DIR, + MODEL_INTERFACE, + MODULE_LOAD_STR; + hpc_kwargs, +) + +expected_pbs_contents = """ +#!/bin/bash +#PBS -N run_1_1 +#PBS -j oe +#PBS -A UCIT0011 +#PBS -q main +#PBS -o test/iteration_001/member_001/model_log.txt +#PBS -l walltime=01:30:00 +#PBS -l select=2:ncpus=16:ngpus=2:mpiprocs=2 + +export MODULEPATH="/glade/campaign/univ/ucit0011/ClimaModules-Derecho:\$MODULEPATH" +module purge +module load climacommon +module list + + +export JULIA_MPI_HAS_CUDA=true +export CLIMACOMMS_DEVICE="CUDA" +export CLIMACOMMS_CONTEXT="MPI" +\$MPITRAMPOLINE_MPIEXEC -n 4 -ppn 2 set_gpu_rank julia --project=exp/dir file.jl +""" + +for (generated_str, test_str) in + zip(split(pbs_file("file.jl"), "\n"), split(expected_pbs_contents, "\n")) + @test generated_str == test_str +end + + +original_julia_file = """\ +import ClimaCalibrate as CAL +include("/glade/u/home/nefrathe/clima/ClimaCalibrate.jl/model_interface.jl") +CAL.run_forward_model(CAL.set_up_forward_model(1, 1, "exp/dir")) +""" +@test julia_file == original_julia_file + +# Helper function for submitting commands and checking job status +function submit_cmd_helper(cmd) + sbatch_filepath, io = mktemp() + write(io, cmd) + close(io) + jobid = CAL.submit_pbs_job(sbatch_filepath) + sleep(1) # Allow time for the job to start + return jobid +end + +# Test job lifecycle +test_cmd = """ +#!/bin/bash +#PBS -j oe +#PBS -A UCIT0011 +#PBS -q develop +#PBS -l walltime=00:00:12 +#PBS -l select=1:ncpus=1 + +sleep 10 +""" + +jobid = submit_cmd_helper(test_cmd) +@test CAL.job_status(jobid) == :RUNNING +@test CAL.job_running(CAL.job_status(jobid)) +@test CAL.job_running(CAL.job_status(jobid)) == CAL.job_running((jobid)) + +sleep(180) # Ensure job finishes. To debug, lower sleep time or comment out the code block +@test CAL.job_completed(jobid) +@test CAL.job_completed(CAL.job_status(jobid)) == CAL.job_completed(jobid) +@test CAL.job_success(jobid) +@test CAL.job_success(CAL.job_status(jobid)) == CAL.job_success(jobid) + +# Test job cancellation +jobid = submit_cmd_helper(test_cmd) +CAL.kill_job(jobid) +sleep(1) +@test CAL.job_status(jobid) == :FAILED +@test CAL.job_completed(CAL.job_status(jobid)) && + CAL.job_failed(CAL.job_status(jobid)) + +# Test batch cancellation +jobids = ntuple(x -> submit_cmd_helper(test_cmd), 5) + +CAL.kill_job.(jobids) +for jobid in jobids + @test CAL.job_completed(jobid) + @test CAL.job_failed(jobid) +end diff --git a/test/slurm_unit_tests.jl b/test/slurm_unit_tests.jl index 34888949..c38965cd 100644 --- a/test/slurm_unit_tests.jl +++ b/test/slurm_unit_tests.jl @@ -12,7 +12,7 @@ const GPUS_PER_TASK = 1 const EXPERIMENT_DIR = "exp/dir" const MODEL_INTERFACE = "model_interface.jl" const MODULE_LOAD_STR = CAL.module_load_string(CAL.CaltechHPCBackend) -const slurm_kwargs = CAL.kwargs( +const hpc_kwargs = CAL.kwargs( time = TIME_LIMIT, cpus_per_task = CPUS_PER_TASK, gpus_per_task = GPUS_PER_TASK, @@ -32,7 +32,7 @@ sbatch_file = CAL.generate_sbatch_script( EXPERIMENT_DIR, MODEL_INTERFACE, MODULE_LOAD_STR; - slurm_kwargs, + hpc_kwargs, ) expected_sbatch_contents = """ @@ -42,11 +42,13 @@ expected_sbatch_contents = """ #SBATCH --gpus-per-task=1 #SBATCH --cpus-per-task=16 #SBATCH --time=01:30:00 -set -euo pipefail -export MODULEPATH=/groups/esm/modules:\$MODULEPATH +export MODULEPATH="/groups/esm/modules:\$MODULEPATH" module purge module load climacommon/2024_05_27 +export CLIMACOMMS_DEVICE="CUDA" +export CLIMACOMMS_CONTEXT="MPI" + srun --output=test/iteration_001/member_001/model_log.txt --open-mode=append julia --project=exp/dir -e ' import ClimaCalibrate as CAL iteration = 1; member = 1 @@ -67,7 +69,7 @@ function submit_cmd_helper(cmd) sbatch_filepath, io = mktemp() write(io, cmd) close(io) - jobid = CAL.submit_sbatch_job(sbatch_filepath) + jobid = CAL.submit_slurm_job(sbatch_filepath) sleep(1) # Allow time for the job to start return jobid end @@ -90,7 +92,7 @@ sleep(180) # Ensure job finishes. To debug, lower sleep time or comment out the # Test job cancellation jobid = submit_cmd_helper(test_cmd) -CAL.kill_slurm_job(jobid) +CAL.kill_job(jobid) sleep(1) @test CAL.job_status(jobid) == :FAILED @test CAL.job_completed(CAL.job_status(jobid)) && @@ -99,7 +101,7 @@ sleep(1) # Test batch cancellation jobids = ntuple(x -> submit_cmd_helper(test_cmd), 5) -CAL.kill_all_jobs(jobids) +CAL.kill_job.(jobids) for jobid in jobids @test CAL.job_completed(CAL.job_status(jobid)) @test CAL.job_failed(CAL.job_status(jobid))