Skip to content

Commit

Permalink
Mostly working slurm pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
nefrathenrici committed Apr 19, 2024
1 parent 0cbd859 commit 8964bc4
Show file tree
Hide file tree
Showing 18 changed files with 140 additions and 2,324 deletions.
2,113 changes: 0 additions & 2,113 deletions experiments/sphere_held_suarez_rhoe_equilmoist/Manifest.toml

This file was deleted.

16 changes: 0 additions & 16 deletions experiments/sphere_held_suarez_rhoe_equilmoist/Project.toml

This file was deleted.

Binary file not shown.
6 changes: 0 additions & 6 deletions experiments/sphere_held_suarez_rhoe_equilmoist/ekp_config.yml

This file was deleted.

This file was deleted.

15 changes: 0 additions & 15 deletions experiments/sphere_held_suarez_rhoe_equilmoist/model_config.yml

This file was deleted.

79 changes: 0 additions & 79 deletions experiments/sphere_held_suarez_rhoe_equilmoist/model_interface.jl

This file was deleted.

Binary file not shown.
Binary file not shown.
45 changes: 0 additions & 45 deletions experiments/sphere_held_suarez_rhoe_equilmoist/observation_map.jl

This file was deleted.

6 changes: 0 additions & 6 deletions experiments/sphere_held_suarez_rhoe_equilmoist/prior.toml

This file was deleted.

13 changes: 12 additions & 1 deletion experiments/surface_fluxes_perfect_model/Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

julia_version = "1.10.2"
manifest_format = "2.0"
project_hash = "2edfa41926269520ea7e69208df499eef7522bd0"
project_hash = "bf8cd485ac3645523b93eb031226bbbdecce1a7f"

[[deps.AMD]]
deps = ["LinearAlgebra", "SparseArrays", "SuiteSparse_jll"]
Expand Down Expand Up @@ -70,6 +70,12 @@ git-tree-sha1 = "a3a402a35a2f7e0b87828ccabbd5ebfbebe356b4"
uuid = "dce04be8-c92d-5529-be00-80e4d2c0e197"
version = "2.3.0"

[[deps.ArgParse]]
deps = ["Logging", "TextWrap"]
git-tree-sha1 = "d4eccacaa3a632e8717556479d45502af44b4c17"
uuid = "c7e460c6-2fb9-53a9-8c5b-16f535851c63"
version = "1.1.5"

[[deps.ArgTools]]
uuid = "0dad84c5-d112-42e6-8d28-ef12dabb789f"
version = "1.1.1"
Expand Down Expand Up @@ -2199,6 +2205,11 @@ version = "0.1.7"
deps = ["InteractiveUtils", "Logging", "Random", "Serialization"]
uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[[deps.TextWrap]]
git-tree-sha1 = "9250ef9b01b66667380cf3275b3f7488d0e25faf"
uuid = "b718987f-49a8-5099-9789-dcd902bef87d"
version = "1.0.1"

[[deps.Thermodynamics]]
deps = ["DocStringExtensions", "KernelAbstractions", "Random", "RootSolvers"]
git-tree-sha1 = "6098c65a2ad62312ac74cb1627c8fb33efe33287"
Expand Down
1 change: 1 addition & 0 deletions experiments/surface_fluxes_perfect_model/Project.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[deps]
ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63"
CairoMakie = "13f3f980-e62b-5c42-98c6-ff1f3baf88f0"
CalibrateAtmos = "4347a170-ebd6-470c-89d3-5c705c0cacc2"
ClimaComms = "3a4d1b5c-c61d-41fd-a00a-5873ba7a1b0d"
Expand Down
7 changes: 3 additions & 4 deletions experiments/surface_fluxes_perfect_model/observation_map.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using Statistics
import YAML
import EnsembleKalmanProcesses: TOMLInterface
import JLD2
import CalibrateAtmos: observation_map
import CalibrateAtmos:
observation_map, ExperimentConfig, path_to_ensemble_member

experiment_dir = joinpath(
pkgdir(CalibrateAtmos),
Expand All @@ -24,8 +24,7 @@ function observation_map(::Val{:surface_fluxes_perfect_model}, iteration)
dims = 1
G_ensemble = Array{Float64}(undef, dims..., ensemble_size)
for m in 1:ensemble_size
member_path =
TOMLInterface.path_to_ensemble_member(output_dir, iteration, m)
member_path = path_to_ensemble_member(output_dir, iteration, m)
ustar = JLD2.load_object(joinpath(member_path, model_output))
G_ensemble[:, m] = process_member_data(ustar)
end
Expand Down
File renamed without changes.
123 changes: 123 additions & 0 deletions slurm_pipeline.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import CalibrateAtmos

# This can live in CalibrateAtmos
# using ArgParse
# s = ArgParseSettings()
# @add_arg_table! s begin
# "--ntasks", "-n"
# help = "Set number of tasks to launch"
# default = 1
# "--time", "-t"
# help = "Set max wallclock time"
# default = "0:45:00"
# "--cpus_per_task", "-c"
# default = 1
# arg_type = Int
# "--gpus_per_task", "-g"
# default = 0
# arg_type = Int
# # "--constraint"
# # default = ""
# # arg_type = String
# end

# slurm_calibration(parsed_args = ArgParse.parse_args(s)) = slurm_calibration(
# parsed_args["time"],
# parsed_args["ntasks"],
# parsed_args["cpus_per_task"],
# parsed_args["gpus_per_task"],
# )

function slurm_calibration(; experiment_dir = dirname(Base.active_project()), model_interface_path =
abspath(joinpath(experiment_dir, "..", "..", "model_interface.jl")),
time_limit = "0:45:00", ntasks = 1, cpus_per_task = 1, gpus_per_task = 0, verbose = false)
partition = gpus_per_task > 0 ? "gpu" : "expansion"

experiment_dir = dirname(Base.active_project())
obs_map_path = joinpath(experiment_dir, "observation_map.jl")
include(obs_map_path)

# Experiment config is created from a YAML file within the experiment_dir
config = CalibrateAtmos.ExperimentConfig(experiment_dir)
(; n_iterations, output_dir, ensemble_size, prior) = config

@info "Initializing calibration" n_iterations ensemble_size output_dir
CalibrateAtmos.initialize(config)

eki = nothing
for i in 0:(n_iterations - 1)
@info "Iteration $i"
tasks = asyncmap(1:ensemble_size; ntasks = ensemble_size) do j
include(obs_map_path)
member_output = joinpath(
CalibrateAtmos.path_to_ensemble_member(output_dir, i, j),
"model_log.txt",
)
fwd_model_cmd = `
srun
--job-name=run_$(i)_$(j) \
--time=$time_limit \
--ntasks=$ntasks \
--partition=$partition \
--cpus-per-task=$cpus_per_task \
--gpus-per-task=$gpus_per_task \
julia --project=$experiment_dir -e "
import CalibrateAtmos as CAL
iteration = $i; member = $j
model_interface = \"$model_interface_path\"; include(model_interface)
experiment_dir = \"$experiment_dir\"
experiment_config = CAL.ExperimentConfig(experiment_dir)
experiment_id = experiment_config.id
physical_model = CAL.get_forward_model(Val(Symbol(experiment_id)))
model_config = CAL.get_config(physical_model, member, iteration, experiment_dir)
CAL.run_forward_model(physical_model, model_config)
@info \"Forward Model Run Completed\" experiment_id physical_model iteration member"
`
try
@info "Running ensemble member $j"
verbose && j == 1 && println(
"Ensemble member 1 command: \n" * join(fwd_model_cmd.exec, " ") * "\n",
)
run(pipeline(fwd_model_cmd; stdout = member_output, stderr = member_output))
return true
catch e
if verbose
@warn """
Ensemble member $j raised an error. See model log for failed run:
$(replace(readchomp(member_output), "\\n" => "\n"))
"""
else
@warn """
Ensemble member $j raised an error. See $member_output for output log.
"""
end
return false
end
end

ensemble_exit_codes = map(fetch, tasks)

if !any(ensemble_exit_codes)
error(
"Full ensemble for iteration $i has failed. See individual model logs for details."
)
elseif !all(ensemble_exit_codes)
@warn "Failed ensemble members: $(findall(!, ensemble_exit_codes))"
end

@info "Completed Iteration $i, Updating Ensemble"
include(obs_map_path)
methods(observation_map)
G_ensemble = CalibrateAtmos.observation_map(Val(Symbol(config.id)), i)
CalibrateAtmos.save_G_ensemble(config, i, G_ensemble)
eki = CalibrateAtmos.update_ensemble(config, i)
end

if config.generate_plots
include(joinpath(experiment_dir, "postprocessing.jl"))
convergence_plot(eki, prior; output = output_dir)
end
end

slurm_calibration(;verbose=true)
Loading

0 comments on commit 8964bc4

Please sign in to comment.