Skip to content

Commit

Permalink
Add pmap for backend
Browse files Browse the repository at this point in the history
  • Loading branch information
nefrathenrici committed Jul 31, 2024
1 parent c6033ec commit a991de8
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .buildkite/clima_server_test/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ steps:
key: "init_cpu_env"
command:
- echo "--- Instantiate SurfaceFluxes calibration project"
- julia --project=experiments/surface_fluxes_perfect_model -e 'using Pkg; Pkg.precompile()'
- julia --project=experiments/surface_fluxes_perfect_model -e 'using Pkg; Pkg.develop(;path="."); Pkg.precompile()'

- wait
- label: "SurfaceFluxes perfect model calibration"
Expand Down
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ steps:
key: "init_cpu_env"
command:
- echo "--- Instantiate SurfaceFluxes calibration project"
- julia --project=experiments/surface_fluxes_perfect_model -e 'using Pkg; Pkg.precompile()'
- julia --project=experiments/surface_fluxes_perfect_model -e 'using Pkg; Pkg.develop(;path="."); Pkg.precompile()'

- wait
- label: "SurfaceFluxes perfect model calibration"
Expand Down
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
name = "ClimaCalibrate"
uuid = "4347a170-ebd6-470c-89d3-5c705c0cacc2"
authors = ["Climate Modeling Alliance"]
version = "0.0.2"
version = "0.0.3"

[deps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
EnsembleKalmanProcesses = "aa8a2aa5-91d8-4396-bcef-d4f2ec43552d"
JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819"
Expand Down
29 changes: 21 additions & 8 deletions src/backends.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Distributed

export get_backend, calibrate, model_run

abstract type AbstractBackend end
Expand Down Expand Up @@ -58,28 +60,39 @@ function module_load_string(::Type{DerechoBackend})
"""
end

calibrate(config::ExperimentConfig; ekp_kwargs...) =
calibrate(get_backend(), config; ekp_kwargs...)
calibrate(config::ExperimentConfig; reruns = 0, ekp_kwargs...) =
calibrate(get_backend(), config; reruns, ekp_kwargs...)

calibrate(experiment_dir::AbstractString; ekp_kwargs...) =
calibrate(get_backend(), ExperimentConfig(experiment_dir); ekp_kwargs...)
calibrate(experiment_dir::AbstractString; reruns = 0, ekp_kwargs...) =
calibrate(
get_backend(),
ExperimentConfig(experiment_dir);
reruns,
ekp_kwargs...,
)

calibrate(
b::Type{JuliaBackend},
experiment_dir::AbstractString;
reruns = 0,
ekp_kwargs...,
) = calibrate(b, ExperimentConfig(experiment_dir); ekp_kwargs...)
) = calibrate(b, ExperimentConfig(experiment_dir); reruns, ekp_kwargs...)

function calibrate(
::Type{JuliaBackend},
config::ExperimentConfig;
reruns = 0,
ekp_kwargs...,
)
(; n_iterations, ensemble_size) = config
eki = initialize(config; ekp_kwargs...)
on_error(e::InterruptException) = rethrow(e)
on_error(e) =
@error "Single ensemble member has errored. See stacktrace" exception =
(e, catch_backtrace())
for i in 0:(n_iterations - 1)
@info "Running iteration $i"
for m in 1:ensemble_size
pmap(1:ensemble_size; retry_delays = reruns, on_error) do m
run_forward_model(set_up_forward_model(m, i, config))
@info "Completed member $m"
end
Expand All @@ -100,11 +113,11 @@ Takes either an ExperimentConfig or an experiment folder.
Available Backends: CaltechHPCBackend, ClimaGPUBackend, DerechoBackend, JuliaBackend
# Keyword Arguments
- `experiment_dir: Directory containing experiment configurations.
- `model_interface: Path to the model interface file.
- `hpc_kwargs`: Dictionary of resource arguments, passed to the job scheduler.
- `reruns`: Number of times to retry a failed ensemble member.
- `verbose::Bool`: Enable verbose logging.
# Usage
Expand Down Expand Up @@ -168,7 +181,7 @@ function calibrate(
)
end

statuses = wait_for_jobs(
wait_for_jobs(
jobids,
output_dir,
iter,
Expand Down

0 comments on commit a991de8

Please sign in to comment.