From 3f784bf50d06bb75eabe394d495619f792958a48 Mon Sep 17 00:00:00 2001 From: nefrathenrici Date: Thu, 5 Dec 2024 17:39:30 -0800 Subject: [PATCH] Add initial worker interface, needs a lot of cleanup --- Project.toml | 2 + src/ClimaCalibrate.jl | 1 + src/ekp_interface.jl | 3 +- src/slurm_workers.jl | 123 ++++++++++++++++++++++++++++++++++++++++++ test/slurm_workers.jl | 53 ++++++++++++++++++ 5 files changed, 180 insertions(+), 2 deletions(-) create mode 100644 src/slurm_workers.jl create mode 100644 test/slurm_workers.jl diff --git a/Project.toml b/Project.toml index fc805cc5..98ce1b0f 100644 --- a/Project.toml +++ b/Project.toml @@ -4,6 +4,7 @@ authors = ["Climate Modeling Alliance"] version = "0.0.5" [deps] +ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" EnsembleKalmanProcesses = "aa8a2aa5-91d8-4396-bcef-d4f2ec43552d" @@ -20,6 +21,7 @@ CESExt = "CalibrateEmulateSample" [compat] CalibrateEmulateSample = "0.5" +ClusterManagers = "0.4.6" Distributed = "1" Distributions = "0.25" EnsembleKalmanProcesses = "1, 2" diff --git a/src/ClimaCalibrate.jl b/src/ClimaCalibrate.jl index f4cfae51..6ef631c9 100644 --- a/src/ClimaCalibrate.jl +++ b/src/ClimaCalibrate.jl @@ -6,5 +6,6 @@ include("slurm.jl") include("pbs.jl") include("backends.jl") include("emulate_sample.jl") +include("slurm_workers.jl") end # module ClimaCalibrate diff --git a/src/ekp_interface.jl b/src/ekp_interface.jl index a9625893..107de40e 100644 --- a/src/ekp_interface.jl +++ b/src/ekp_interface.jl @@ -267,7 +267,7 @@ function _initialize( rng_ekp = Random.MersenneTwister(rng_seed) initial_ensemble = EKP.construct_initial_ensemble(rng_ekp, prior, ensemble_size) - + @show typeof(initial_ensemble) ekp_str_kwargs = Dict([string(k) => v for (k, v) in ekp_kwargs]) eki_constructor = (args...) -> EKP.EnsembleKalmanProcess( @@ -275,7 +275,6 @@ function _initialize( merge(EKP.default_options_dict(EKP.Inversion()), ekp_str_kwargs); rng = rng_ekp, ) - eki = if isnothing(noise) eki_constructor(initial_ensemble, observations, EKP.Inversion()) else diff --git a/src/slurm_workers.jl b/src/slurm_workers.jl new file mode 100644 index 00000000..ce22ac3a --- /dev/null +++ b/src/slurm_workers.jl @@ -0,0 +1,123 @@ +using Distributed, ClusterManagers +import EnsembleKalmanProcesses as EKP + +#= + `srun -J julia-4014777 -n 10 -D /home/nefrathe/clima/ClimaCalibrate.jl + --cpus-per-task=1 + -t 00:20:00 -o /home/nefrathe/clima/ClimaCalibrate.jl/./julia-4014777-17333586001-%4t.out + /clima/software/julia/julia-1.11.0/bin/julia + --project=/home/nefrathe/clima/ClimaCalibrate.jl/Project.toml + --worker=U3knisg2TJufcrbJ` +=# + + +# srun_proc = open(srun_cmd) + +# worker_cookie() = begin Distributed.init_multi(); cluster_cookie() end +# worker_arg() = `--worker=$(worker_cookie())` + +# srun_cmd = `srun -J $jobname -n $np -D $exehome $(srunargs) $exename $exeflags $(worker_arg())` + +default_worker_pool() = WorkerPool(workers()) + +function run_iteration(iter, ensemble_size, output_dir; worker_pool = default_worker_pool(), failure_rate = 0.5) + # Create a channel to collect results + results = Channel{Any}(ensemble_size) + @sync begin + for m in 1:(ensemble_size) + @async begin + # Get a worker from the pool + worker = take!(worker_pool) + try + model_config = set_up_forward_model(m, iter, config) + result = remotecall_fetch( + run_forward_model, + worker, + model_config, + ) + put!(results, (m, result)) + catch e + @error "Error running member $m" exception = e + put!(results, (m, e)) + finally + # Always return worker to pool + put!(worker_pool, worker) + end + end + end + end + + # Collect all results + ensemble_results = Dict{Int, Any}() + for _ in 1:(ensemble_size) + m, result = take!(results) + if result isa Exception + @error "Member $m failed" error = result + else + ensemble_results[m] = result + end + end + results = values(ensemble_results) + iter_failure_rate = sum(isa.(results, Exception)) / ensemble_size + if iter_failure_rate > failure_rate + error("Ensemble for iter $iter had a $(iter_failure_rate * 100)% failure rate") + end +end + +function worker_calibrate(ensemble_size, n_iterations, observations, noise, prior, output_dir; failure_rate = 0.5, worker_pool = default_worker_pool(), ekp_kwargs...) + initialize( + ensemble_size, + observations, + noise, + prior, + output_dir; + rng_seed = 1234, + ekp_kwargs..., + ) + for iter in 0:(n_iterations) + (; time) = @timed run_iteration(iter, ensemble_size, output_dir; worker_pool, failure_rate) + @info "Iteration $iter time: $time" + # Process results + G_ensemble = observation_map(iter) + save_G_ensemble(output_dir, iter, G_ensemble) + update_ensemble(output_dir, iter, prior) + iter_path = path_to_iteration(output_dir, iter) + end + return JLD2.load_object(path_to_iteration(output_dir, n_iterations)) +end + +function worker_calibrate(ekp::EKP.EnsembleKalmanProcess, ensemble_size,n_iterations, observations, noise, prior, output_dir; failure_rate = 0.5, worker_pool = default_worker_pool(), ekp_kwargs...) + initialize( + ekp, prior, output_dir + ; + rng_seed = 1234, + ) + for iter in 0:(n_iterations) + (; time) = @timed run_iteration(iter, ensemble_size, output_dir; worker_pool, failure_rate) + @info "Iteration $iter time: $time" + # Process results + G_ensemble = observation_map(iter) + save_G_ensemble(output_dir, iter, G_ensemble) + update_ensemble(output_dir, iter, prior) + iter_path = path_to_iteration(output_dir, iter) + end + return JLD2.load_object(path_to_iteration(output_dir, n_iterations)) +end + + +function worker_calibrate(config; worker_pool = default_worker_pool()) + (; ensemble_size, observations, noise, prior, output_dir) = config + return worker_calibrate(ensemble_size, observations, noise, prior, output_dir; worker_pool) +end + +function slurm_worker_pool(nprocs::Int; slurm_kwargs...) + return WorkerPool(addprocs( + SlurmManager(nprocs); + t = "01:00:00", cpus_per_task = 1, + exeflags = "--project=$(Base.active_project())", + slurm_kwargs..., + )) +end + +# gpus_per_task=1 +worker_pool = default_worker_pool() diff --git a/test/slurm_workers.jl b/test/slurm_workers.jl new file mode 100644 index 00000000..38562a4c --- /dev/null +++ b/test/slurm_workers.jl @@ -0,0 +1,53 @@ +# Tests for SurfaceFluxes example calibration on HPC, used in buildkite testing +# To run, open the REPL: julia --project=experiments/surface_fluxes_perfect_model test/hpc_backend_e2e.jl + +using Pkg +Pkg.instantiate(; verbose = true) + +import ClimaCalibrate: + get_backend, + HPCBackend, + JuliaBackend, + calibrate, + get_prior, + kwargs, + ExperimentConfig, + DerechoBackend +using Test +import EnsembleKalmanProcesses: get_ϕ_mean_final, get_g_mean_final + +experiment_dir = dirname(Base.active_project()) +model_interface = joinpath(experiment_dir, "model_interface.jl") + +# Generate observational data and include observational map +include(joinpath(experiment_dir, "generate_data.jl")) +include(joinpath(experiment_dir, "observation_map.jl")) +include(model_interface) + +prior = get_prior(joinpath(experiment_dir, "prior.toml")) + +function test_sf_calibration_output(eki, prior) + @testset "End to end test using file config (surface fluxes perfect model)" begin + parameter_values = get_ϕ_mean_final(prior, eki) + test_parameter_values = [4.778584250117946, 3.7295665619234697] + @test all( + isapprox.(parameter_values, test_parameter_values; rtol = 1e-3), + ) + + forward_model_output = get_g_mean_final(eki) + test_model_output = [0.05228473730385304] + @test all( + isapprox.(forward_model_output, test_model_output; rtol = 1e-3), + ) + end +end + +@everywhere +eki = worker_calibrate(config; model_interface, hpc_kwargs, verbose = true) +test_sf_calibration_output(eki, prior) + +# Pure Julia calibration, this should run anywhere +eki = calibrate(JuliaBackend, experiment_dir) +test_sf_calibration_output(eki, prior) + +include(joinpath(experiment_dir, "postprocessing.jl"))