From 50ad5e4b203204f82856e7e6ba9f1a229ae8ef80 Mon Sep 17 00:00:00 2001 From: Nat Efrat-Henrici <60049837+nefrathenrici@users.noreply.github.com> Date: Mon, 25 Mar 2024 15:59:53 -0700 Subject: [PATCH] Clean up slurm pipeline (#73) --- docs/src/experiment_setup_guide.md | 2 +- .../generate_observations.sbatch | 2 +- .../observation_map.jl | 6 ++-- .../Manifest.toml | 4 +-- .../generate_truth.jl | 2 -- .../observation_map.jl | 5 ++-- pipeline.sh | 2 +- slurm/initialize.sbatch | 10 ++++++- slurm/model_run.sbatch | 25 +++++++++------- slurm/parse_commandline.sh | 11 +++++-- slurm/update.sbatch | 12 +++----- src/ekp_interface.jl | 29 ++++++++++++++----- 12 files changed, 67 insertions(+), 43 deletions(-) mode change 100644 => 100755 slurm/parse_commandline.sh diff --git a/docs/src/experiment_setup_guide.md b/docs/src/experiment_setup_guide.md index 97c3ccad..3366f008 100644 --- a/docs/src/experiment_setup_guide.md +++ b/docs/src/experiment_setup_guide.md @@ -136,7 +136,7 @@ observation_map(::Val(Symbol(experiment_id)), iteration) This function must load in model diagnostics for each ensemble member in the iteration and construct an array `arr = Array{Float64}(undef, dims..., ensemble_size)` such that `arr[:, i]` will return the i-th ensemble member's observation map output. Note this floating point precision is required for the EKI update step. -In the update step of EKI, the array will be saved in a JLD2 file named `observation_map.jld2` in the iteration folder of the output directory. +In the update step of EKI, the array will be saved in a JLD2 file named `G_ensemble.jld2` in the iteration folder of the output directory. As an example, in `observation_map(iteration)` in the `sphere_held_suarez_rhoe_equilmoist` experiment, we have the following sequence: diff --git a/experiments/sphere_held_suarez_rhoe_equilmoist/generate_observations.sbatch b/experiments/sphere_held_suarez_rhoe_equilmoist/generate_observations.sbatch index 9860ad8f..94d4020c 100644 --- a/experiments/sphere_held_suarez_rhoe_equilmoist/generate_observations.sbatch +++ b/experiments/sphere_held_suarez_rhoe_equilmoist/generate_observations.sbatch @@ -3,7 +3,7 @@ #SBATCH --ntasks=64 #SBATCH --cpus-per-task=8 #SBATCH --partition=expansion -#SBATCH --output="experiments/sphere_held_suarez_rhoe_equilmoist/truth_simulation/model_log.out" +#SBATCH --output="experiments/sphere_held_suarez_rhoe_equilmoist/truth_simulation/model_log.txt" #SBATCH --partition=expansion # Configure the environment diff --git a/experiments/sphere_held_suarez_rhoe_equilmoist/observation_map.jl b/experiments/sphere_held_suarez_rhoe_equilmoist/observation_map.jl index 3b63794c..d6530088 100644 --- a/experiments/sphere_held_suarez_rhoe_equilmoist/observation_map.jl +++ b/experiments/sphere_held_suarez_rhoe_equilmoist/observation_map.jl @@ -3,17 +3,15 @@ using Statistics import YAML import EnsembleKalmanProcesses: TOMLInterface import JLD2 -import CalibrateAtmos: observation_map +import CalibrateAtmos: observation_map, get_ekp_config using ClimaAnalysis export observation_map function observation_map(::Val{:sphere_held_suarez_rhoe_equilmoist}, iteration) experiment_id = "sphere_held_suarez_rhoe_equilmoist" - config = - YAML.load_file(joinpath("experiments", experiment_id, "ekp_config.yml")) + config = get_ekp_config(experiment_id) output_dir = config["output_dir"] ensemble_size = config["ensemble_size"] - model_output = "ta_60d_average.nc" dims = 1 G_ensemble = Array{Float64}(undef, dims..., ensemble_size) diff --git a/experiments/surface_fluxes_perfect_model/Manifest.toml b/experiments/surface_fluxes_perfect_model/Manifest.toml index 5dab021d..e5c9f196 100644 --- a/experiments/surface_fluxes_perfect_model/Manifest.toml +++ b/experiments/surface_fluxes_perfect_model/Manifest.toml @@ -1,6 +1,6 @@ # This file is machine-generated - editing it directly is not advised -julia_version = "1.10.1" +julia_version = "1.10.2" manifest_format = "2.0" project_hash = "8b52a1f87337958a3f0ec6e731cccb862b78de20" @@ -300,7 +300,7 @@ uuid = "83423d85-b0ee-5818-9007-b63ccbeb887a" version = "1.16.1+1" [[deps.CalibrateAtmos]] -deps = ["CalibrateEmulateSample", "ClimaComms", "ClimaCore", "ClimaParams", "Distributions", "EnsembleKalmanProcesses", "JLD2", "PrecompileTools", "Random", "SciMLBase", "TOML", "YAML"] +deps = ["CalibrateEmulateSample", "ClimaComms", "ClimaParams", "Distributions", "EnsembleKalmanProcesses", "JLD2", "PrecompileTools", "Random", "SciMLBase", "TOML", "YAML"] path = "../.." uuid = "4347a170-ebd6-470c-89d3-5c705c0cacc2" version = "0.1.0" diff --git a/experiments/surface_fluxes_perfect_model/generate_truth.jl b/experiments/surface_fluxes_perfect_model/generate_truth.jl index b76026be..444e3004 100644 --- a/experiments/surface_fluxes_perfect_model/generate_truth.jl +++ b/experiments/surface_fluxes_perfect_model/generate_truth.jl @@ -1,7 +1,5 @@ # generate_truth: generate true y, noise and x_inputs -using Pkg experiment_id = "surface_fluxes_perfect_model" -Pkg.activate("experiments/$experiment_id") import SurfaceFluxes as SF import SurfaceFluxes.Parameters as SFPP diff --git a/experiments/surface_fluxes_perfect_model/observation_map.jl b/experiments/surface_fluxes_perfect_model/observation_map.jl index c183a370..61d078be 100644 --- a/experiments/surface_fluxes_perfect_model/observation_map.jl +++ b/experiments/surface_fluxes_perfect_model/observation_map.jl @@ -2,7 +2,7 @@ using Statistics import YAML import EnsembleKalmanProcesses: TOMLInterface import JLD2 -import CalibrateAtmos: observation_map +import CalibrateAtmos: observation_map, get_ekp_config """ observation_map(::Val{:surface_fluxes_perfect_model}, iteration) @@ -12,8 +12,7 @@ as specified by process_member_data, for the given iteration. """ function observation_map(::Val{:surface_fluxes_perfect_model}, iteration) experiment_id = "surface_fluxes_perfect_model" - config = - YAML.load_file(joinpath("experiments", experiment_id, "ekp_config.yml")) + config = get_ekp_config(experiment_id) output_dir = config["output_dir"] ensemble_size = config["ensemble_size"] model_output = "model_ustar_array.jld2" diff --git a/pipeline.sh b/pipeline.sh index 025f6435..8fbc842b 100755 --- a/pipeline.sh +++ b/pipeline.sh @@ -13,6 +13,7 @@ fi init_id=$(sbatch --parsable \ --output=$logfile \ --partition=$partition \ + --export=generate_data=$generate_data \ slurm/initialize.sbatch $experiment_id) echo -e "Initialization job_id: $init_id\n" @@ -21,7 +22,6 @@ dependency="afterok:$init_id" for i in $(seq 0 $((n_iterations - 1))) do echo "Scheduling iteration $i" - ensemble_array_id=$( sbatch --dependency=$dependency --kill-on-invalid-dep=yes --parsable \ --job=model-$i \ diff --git a/slurm/initialize.sbatch b/slurm/initialize.sbatch index c30e612a..73a9ebca 100644 --- a/slurm/initialize.sbatch +++ b/slurm/initialize.sbatch @@ -8,7 +8,15 @@ experiment_id=$1 JULIA_NUM_PRECOMPILE_TASKS=8 echo "Initializing calibration for experiment: $experiment_id" -julia --color=no --project=experiments/$experiment_id -e 'using Pkg; Pkg.instantiate(;verbose=true)' + +julia --color=no --project=experiments/$experiment_id -e ' + using Pkg; Pkg.instantiate(;verbose=true) +' + +if [ "$generate_data" = true ] ; then + echo "Generating observations" + julia --project=experiments/$experiment_id experiments/$experiment_id/generate_truth.jl +fi julia --color=no --project=experiments/$experiment_id -e ' import CalibrateAtmos diff --git a/slurm/model_run.sbatch b/slurm/model_run.sbatch index a8b6b64a..dd4b350e 100644 --- a/slurm/model_run.sbatch +++ b/slurm/model_run.sbatch @@ -2,19 +2,24 @@ # Extract command-line arguments experiment_id=$1 -iteration=$2 +i=$2 # Find output directory -format_i=$(printf "iteration_%03d" "$iteration") +format_i=$(printf "iteration_%03d" "$i") member=$(printf "member_%03d" "$SLURM_ARRAY_TASK_ID") -output=output/$experiment_id/$format_i/$member/model_log.out +output=output/$experiment_id/$format_i/$member/model_log.txt # Run the forward model -srun --output=$output julia --color=no --project=experiments/$experiment_id -e " - import CalibrateAtmos - include(\"experiments/$experiment_id/model_interface.jl\") +srun --output=$output julia --color=no --project=experiments/$experiment_id -e ' + import CalibrateAtmos as CAL + + experiment_id = "'$experiment_id'" + i = '$i' + member = '$SLURM_ARRAY_TASK_ID' - physical_model = CalibrateAtmos.get_forward_model(Val(:$experiment_id)) - config = CalibrateAtmos.get_config(physical_model, $SLURM_ARRAY_TASK_ID, $iteration, \"$experiment_id\") - CalibrateAtmos.run_forward_model(physical_model, config) -" + include("experiments/$experiment_id/model_interface.jl") + + physical_model = CAL.get_forward_model(Val(Symbol(experiment_id))) + config = CAL.get_config(physical_model, member, i, experiment_id) + CAL.run_forward_model(physical_model, config) +' diff --git a/slurm/parse_commandline.sh b/slurm/parse_commandline.sh old mode 100644 new mode 100755 index bddf0834..79df80c5 --- a/slurm/parse_commandline.sh +++ b/slurm/parse_commandline.sh @@ -3,7 +3,7 @@ slurm_time="2:00:00" slurm_ntasks="1" slurm_cpus_per_task="1" slurm_gpus_per_task="0" - +generate_data=false help_message="Usage: ./pipeline.sh [options] experiment_id @@ -12,13 +12,14 @@ Options: -n, --ntasks: Set number of tasks to launch (default: 1). -c, --cpus_per_task: Set CPU cores per task (mutually exclusive with -g, default: 8). -g, --gpus_per_task: Set GPUs per task (mutually exclusive with -c, default: 0). + --generate_data: If set, generates observational data for use in the calibration. -h, --help: Display this help message. Arguments: experiment_id: A unique identifier for your experiment (required)." # Parse arguments using getopt -VALID_ARGS=$(getopt -o h,t:,n:,c:,g: --long help,time:,ntasks:,cpus_per_task:,gpus_per_task: -- "$@") +VALID_ARGS=$(getopt -o h,t:,n:,c:,g: --long help,time:,ntasks:,cpus_per_task:,gpus_per_task:,generate_data -- "$@") if [[ $? -ne 0 ]]; then exit 1; fi @@ -44,6 +45,10 @@ while [ : ]; do slurm_gpus_per_task="$2" shift 2 ;; + --generate_data) + generate_data=true + shift 1 + ;; -h | --help) printf "%s\n" "$help_message" exit 0 @@ -62,7 +67,7 @@ fi ensemble_size=$(grep "ensemble_size:" experiments/$experiment_id/ekp_config.yml | awk '{print $2}') n_iterations=$(grep "n_iterations:" experiments/$experiment_id/ekp_config.yml | awk '{print $2}') output=$(grep "output_dir:" experiments/$experiment_id/ekp_config.yml | awk '{print $2}') -logfile=$output/experiment_log.out +logfile=$output/experiment_log.txt # Set partition if [[ $slurm_gpus_per_task -gt 0 ]]; then diff --git a/slurm/update.sbatch b/slurm/update.sbatch index 25dd42f5..39e84e7a 100644 --- a/slurm/update.sbatch +++ b/slurm/update.sbatch @@ -6,17 +6,13 @@ i=$2 echo "Running update step after iteration $i" julia --color=no --project=experiments/$experiment_id -e ' - import YAML, JLD2 - import CalibrateAtmos + import CalibrateAtmos as CAL experiment_id = "'$experiment_id'" i = '$i' include("experiments/'$experiment_id'/model_interface.jl") - G_ensemble = CalibrateAtmos.observation_map(Val(Symbol(experiment_id)), i) - config = YAML.load_file(joinpath("experiments", experiment_id, "ekp_config.yml")) - output_dir = config["output_dir"] - iter_path = CalibrateAtmos.path_to_iteration(output_dir, i) - JLD2.save_object(joinpath(iter_path, "observation_map.jld2"), G_ensemble) - CalibrateAtmos.update_ensemble(experiment_id, i) + G_ensemble = CAL.observation_map(Val(Symbol(experiment_id)), i) + CAL.save_G_ensemble(experiment_id, i, G_ensemble) + CAL.update_ensemble(experiment_id, i) ' echo "Update step for iteration $i complete" diff --git a/src/ekp_interface.jl b/src/ekp_interface.jl index 405e05d4..78541aea 100644 --- a/src/ekp_interface.jl +++ b/src/ekp_interface.jl @@ -33,6 +33,25 @@ function get_prior(param_dict::AbstractDict; names = nothing) return prior end +""" + get_ekp_config(experiment_id) + +Load the EKP configuration for a given `experiment_id` +""" +get_ekp_config(experiment_id) = + YAML.load_file(joinpath("experiments", experiment_id, "ekp_config.yml")) + +""" + save_G_ensemble(experiment_id, iteration, G_ensemble) + +Save an ensemble's observation map output to the correct folder. +""" +function save_G_ensemble(experiment_id, iteration, G_ensemble) + config = get_ekp_config(experiment_id) + iter_path = path_to_iteration(config["output_dir"], iteration) + JLD2.save_object(joinpath(iter_path, "G_ensemble.jld2"), G_ensemble) +end + """ initialize( experiment_id; @@ -109,7 +128,7 @@ function update_ensemble( eki = JLD2.load_object(eki_path) # Load data from the ensemble - G_ens = JLD2.load_object(joinpath(iter_path, "observation_map.jld2")) + G_ens = JLD2.load_object(joinpath(iter_path, "G_ensemble.jld2")) # Update EKP.update_ensemble!(eki, G_ens) @@ -154,8 +173,7 @@ eki = CalibrateAtmos.calibrate(experiment_id) ``` """ function calibrate(experiment_id; device = ClimaComms.device()) - ekp_config = - YAML.load_file(joinpath("experiments", experiment_id, "ekp_config.yml")) + ekp_config = get_ekp_config(experiment_id) # initialize the CalibrateAtmos initialize(experiment_id) @@ -179,10 +197,7 @@ function calibrate(experiment_id; device = ClimaComms.device()) # update EKP with the ensemble output and update calibrated parameters G_ensemble = observation_map(Val(Symbol(experiment_id)), i) - JLD2.save_object( - joinpath(path_to_iteration(output_dir, i), "observation_map.jld2"), - G_ensemble, - ) + save_G_ensemble(experiment_id, i, G_ensemble) eki = update_ensemble(experiment_id, i) end return eki