Skip to content

Commit

Permalink
Add initial worker interface, needs a lot of cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
nefrathenrici committed Dec 6, 2024
1 parent 81dbec4 commit 3f784bf
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ authors = ["Climate Modeling Alliance"]
version = "0.0.5"

[deps]
ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
EnsembleKalmanProcesses = "aa8a2aa5-91d8-4396-bcef-d4f2ec43552d"
Expand All @@ -20,6 +21,7 @@ CESExt = "CalibrateEmulateSample"

[compat]
CalibrateEmulateSample = "0.5"
ClusterManagers = "0.4.6"
Distributed = "1"
Distributions = "0.25"
EnsembleKalmanProcesses = "1, 2"
Expand Down
1 change: 1 addition & 0 deletions src/ClimaCalibrate.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ include("slurm.jl")
include("pbs.jl")
include("backends.jl")
include("emulate_sample.jl")
include("slurm_workers.jl")

end # module ClimaCalibrate
3 changes: 1 addition & 2 deletions src/ekp_interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,14 @@ function _initialize(
rng_ekp = Random.MersenneTwister(rng_seed)
initial_ensemble =
EKP.construct_initial_ensemble(rng_ekp, prior, ensemble_size)

@show typeof(initial_ensemble)
ekp_str_kwargs = Dict([string(k) => v for (k, v) in ekp_kwargs])
eki_constructor =
(args...) -> EKP.EnsembleKalmanProcess(
args...,
merge(EKP.default_options_dict(EKP.Inversion()), ekp_str_kwargs);
rng = rng_ekp,
)

eki = if isnothing(noise)
eki_constructor(initial_ensemble, observations, EKP.Inversion())
else
Expand Down
123 changes: 123 additions & 0 deletions src/slurm_workers.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
using Distributed, ClusterManagers
import EnsembleKalmanProcesses as EKP

#=
`srun -J julia-4014777 -n 10 -D /home/nefrathe/clima/ClimaCalibrate.jl
--cpus-per-task=1
-t 00:20:00 -o /home/nefrathe/clima/ClimaCalibrate.jl/./julia-4014777-17333586001-%4t.out
/clima/software/julia/julia-1.11.0/bin/julia
--project=/home/nefrathe/clima/ClimaCalibrate.jl/Project.toml
--worker=U3knisg2TJufcrbJ`
=#


# srun_proc = open(srun_cmd)

# worker_cookie() = begin Distributed.init_multi(); cluster_cookie() end
# worker_arg() = `--worker=$(worker_cookie())`

# srun_cmd = `srun -J $jobname -n $np -D $exehome $(srunargs) $exename $exeflags $(worker_arg())`

default_worker_pool() = WorkerPool(workers())

function run_iteration(iter, ensemble_size, output_dir; worker_pool = default_worker_pool(), failure_rate = 0.5)
# Create a channel to collect results
results = Channel{Any}(ensemble_size)
@sync begin
for m in 1:(ensemble_size)
@async begin
# Get a worker from the pool
worker = take!(worker_pool)
try
model_config = set_up_forward_model(m, iter, config)
result = remotecall_fetch(
run_forward_model,
worker,
model_config,
)
put!(results, (m, result))
catch e
@error "Error running member $m" exception = e
put!(results, (m, e))
finally
# Always return worker to pool
put!(worker_pool, worker)
end
end
end
end

# Collect all results
ensemble_results = Dict{Int, Any}()
for _ in 1:(ensemble_size)
m, result = take!(results)
if result isa Exception
@error "Member $m failed" error = result
else
ensemble_results[m] = result
end
end
results = values(ensemble_results)
iter_failure_rate = sum(isa.(results, Exception)) / ensemble_size
if iter_failure_rate > failure_rate
error("Ensemble for iter $iter had a $(iter_failure_rate * 100)% failure rate")
end
end

function worker_calibrate(ensemble_size, n_iterations, observations, noise, prior, output_dir; failure_rate = 0.5, worker_pool = default_worker_pool(), ekp_kwargs...)
initialize(
ensemble_size,
observations,
noise,
prior,
output_dir;
rng_seed = 1234,
ekp_kwargs...,
)
for iter in 0:(n_iterations)
(; time) = @timed run_iteration(iter, ensemble_size, output_dir; worker_pool, failure_rate)
@info "Iteration $iter time: $time"
# Process results
G_ensemble = observation_map(iter)
save_G_ensemble(output_dir, iter, G_ensemble)
update_ensemble(output_dir, iter, prior)
iter_path = path_to_iteration(output_dir, iter)
end
return JLD2.load_object(path_to_iteration(output_dir, n_iterations))
end

function worker_calibrate(ekp::EKP.EnsembleKalmanProcess, ensemble_size,n_iterations, observations, noise, prior, output_dir; failure_rate = 0.5, worker_pool = default_worker_pool(), ekp_kwargs...)
initialize(
ekp, prior, output_dir
;
rng_seed = 1234,
)
for iter in 0:(n_iterations)
(; time) = @timed run_iteration(iter, ensemble_size, output_dir; worker_pool, failure_rate)
@info "Iteration $iter time: $time"
# Process results
G_ensemble = observation_map(iter)
save_G_ensemble(output_dir, iter, G_ensemble)
update_ensemble(output_dir, iter, prior)
iter_path = path_to_iteration(output_dir, iter)
end
return JLD2.load_object(path_to_iteration(output_dir, n_iterations))
end


function worker_calibrate(config; worker_pool = default_worker_pool())
(; ensemble_size, observations, noise, prior, output_dir) = config
return worker_calibrate(ensemble_size, observations, noise, prior, output_dir; worker_pool)
end

function slurm_worker_pool(nprocs::Int; slurm_kwargs...)
return WorkerPool(addprocs(
SlurmManager(nprocs);
t = "01:00:00", cpus_per_task = 1,
exeflags = "--project=$(Base.active_project())",
slurm_kwargs...,
))
end

# gpus_per_task=1
worker_pool = default_worker_pool()
53 changes: 53 additions & 0 deletions test/slurm_workers.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Tests for SurfaceFluxes example calibration on HPC, used in buildkite testing
# To run, open the REPL: julia --project=experiments/surface_fluxes_perfect_model test/hpc_backend_e2e.jl

using Pkg
Pkg.instantiate(; verbose = true)

import ClimaCalibrate:
get_backend,
HPCBackend,
JuliaBackend,
calibrate,
get_prior,
kwargs,
ExperimentConfig,
DerechoBackend
using Test
import EnsembleKalmanProcesses: get_ϕ_mean_final, get_g_mean_final

experiment_dir = dirname(Base.active_project())
model_interface = joinpath(experiment_dir, "model_interface.jl")

# Generate observational data and include observational map
include(joinpath(experiment_dir, "generate_data.jl"))
include(joinpath(experiment_dir, "observation_map.jl"))
include(model_interface)

prior = get_prior(joinpath(experiment_dir, "prior.toml"))

function test_sf_calibration_output(eki, prior)
@testset "End to end test using file config (surface fluxes perfect model)" begin
parameter_values = get_ϕ_mean_final(prior, eki)
test_parameter_values = [4.778584250117946, 3.7295665619234697]
@test all(
isapprox.(parameter_values, test_parameter_values; rtol = 1e-3),
)

forward_model_output = get_g_mean_final(eki)
test_model_output = [0.05228473730385304]
@test all(
isapprox.(forward_model_output, test_model_output; rtol = 1e-3),
)
end
end

@everywhere
eki = worker_calibrate(config; model_interface, hpc_kwargs, verbose = true)
test_sf_calibration_output(eki, prior)

# Pure Julia calibration, this should run anywhere
eki = calibrate(JuliaBackend, experiment_dir)
test_sf_calibration_output(eki, prior)

include(joinpath(experiment_dir, "postprocessing.jl"))

0 comments on commit 3f784bf

Please sign in to comment.