Skip to content

Commit

Permalink
Add WorkerBackend, clean up constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
nefrathenrici committed Dec 17, 2024
1 parent 4c8e057 commit c3f6707
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 283 deletions.
271 changes: 203 additions & 68 deletions src/backends.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import EnsembleKalmanProcesses as EKP

export get_backend, calibrate, model_run

export JuliaBackend, WorkerBackend
export HPCBackend, ClimaGPUBackend, DerechoBackend, CaltechHPCBackend

abstract type AbstractBackend end

struct JuliaBackend <: AbstractBackend end
Expand All @@ -16,6 +19,8 @@ struct ClimaGPUBackend <: SlurmBackend end

struct DerechoBackend <: HPCBackend end

struct WorkerBackend <: AbstractBackend end

"""
get_backend()
Expand Down Expand Up @@ -62,44 +67,52 @@ function module_load_string(::Type{DerechoBackend})
"""
end

calibrate(config::ExperimentConfig; reruns = 0, ekp_kwargs...) =
calibrate(get_backend(), config; reruns, ekp_kwargs...)
calibrate(
config::ExperimentConfig;
model_interface = nothing,
hpc_kwargs = Dict(),
ekp_kwargs...,
) = calibrate(get_backend(), config; model_interface, hpc_kwargs, ekp_kwargs...)

calibrate(experiment_dir::AbstractString; reruns = 0, ekp_kwargs...) =
calibrate(
function calibrate(
ensemble_size::Int,
n_iterations::Int,
observations,
noise,
prior,
output_dir;
model_interface = nothing,
hpc_kwargs = Dict(),
ekp_kwargs...,
)
return calibrate(
get_backend(),
ExperimentConfig(experiment_dir);
reruns,
ensemble_size,
n_iterations,
observations,
noise,
prior,
output_dir;
model_interface,
hpc_kwargs,
ekp_kwargs...,
)

calibrate(
b::Type{JuliaBackend},
experiment_dir::AbstractString;
reruns = 0,
ekp_kwargs...,
) = calibrate(b, ExperimentConfig(experiment_dir); reruns, ekp_kwargs...)
end

function calibrate(
::Type{JuliaBackend},
config::ExperimentConfig;
reruns = 0,
ekp = nothing,
ekp_kwargs...,
)
(; n_iterations, output_dir, ensemble_size) = config
ekp = if ekp isa EKP.EnsembleKalmanProcess
initialize(ekp, prior, output_dir)
else
initialize(config; ekp_kwargs...)
end
ekp = initialize(config; ekp_kwargs...)
on_error(e::InterruptException) = rethrow(e)
on_error(e) =
@error "Single ensemble member has errored. See stacktrace" exception =
(e, catch_backtrace())
for i in 0:(n_iterations - 1)
@info "Running iteration $i"
pmap(1:ensemble_size; retry_delays = reruns, on_error) do m
pmap(1:ensemble_size; retry_delays = 0, on_error) do m
forward_model(i, m)
@info "Completed member $m"
end
Expand All @@ -113,73 +126,195 @@ function calibrate(
return ekp
end

const DEFAULT_FAILURE_RATE = 0.5

"""
calibrate(::Type{AbstractBackend}, config::ExperimentConfig; kwargs...)
calibrate(::Type{AbstractBackend}, experiment_dir; kwargs...)
calibrate(::Type{AbstractBackend}, ekp::EnsembleKalmanProcess, experiment_dir; kwargs...)
calibrate(backend, ensemble_size, n_iterations, observations, noise, prior, output_dir; ekp_kwargs...)
calibrate(backend, ekp::EnsembleKalmanProcess, ensemble_size, n_iterations, observations, noise, prior, output_dir)
calibrate(backend, config::ExperimentConfig; ekp_kwargs...)
Run a full calibration on the given backend.
Run a full calibration, scheduling the forward model runs on Caltech's HPC cluster.
If the EKP struct is not given, it will be constructed upon initialization.
The experiment configuration (ensemble size, prior, observations, etc) can be
wrapped in an ExperimentConfig or passed in as arguments to the function.
Takes either an ExperimentConfig or an experiment folder.
Available Backends: WorkerBackend, CaltechHPCBackend, ClimaGPUBackend, DerechoBackend, JuliaBackend
Available Backends: CaltechHPCBackend, ClimaGPUBackend, DerechoBackend, JuliaBackend
Derecho, ClimaGPU, and CaltechHPC backends are designed to run on a specific high-performance computing cluster.
WorkerBackend uses Distributed.jl to run the forward model on workers.
# Keyword Arguments
- `experiment_dir: Directory containing experiment configurations.
## Keyword Arguments for HPC backends
- `model_interface: Path to the model interface file.
- `hpc_kwargs`: Dictionary of resource arguments, passed to the job scheduler.
- `reruns`: Number of times to retry a failed ensemble member.
- `hpc_kwargs`: Dictionary of resource arguments for HPC clusters, passed to the job scheduler.
- `verbose::Bool`: Enable verbose logging.
- Any keyword arguments for the EnsembleKalmanProcess constructor, such as `scheduler`
"""
function calibrate(
b::Type{WorkerBackend},
config::ExperimentConfig;
failure_rate = DEFAULT_FAILURE_RATE,
worker_pool = default_worker_pool(),
ekp_kwargs...,
)
(; ensemble_size, n_iterations, observations, noise, prior, output_dir) =
config
return calibrate(
b,
ensemble_size,
n_iterations,
observations,
noise,
prior,
output_dir;
failure_rate,
worker_pool,
ekp_kwargs...,
)
end

# Usage
Open julia: `julia --project=experiments/surface_fluxes_perfect_model`
```julia
using ClimaCalibrate
function calibrate(
b::Type{WorkerBackend},
ensemble_size::Int,
n_iterations::Int,
observations,
noise,
prior,
output_dir;
failure_rate = DEFAULT_FAILURE_RATE,
worker_pool = default_worker_pool(),
ekp_kwargs...,
)
eki = ekp_constructor(ensemble_size, prior, observations, noise)
return calibrate(
b,
eki,
ensemble_size,
n_iterations,
observations,
noise,
prior,
output_dir;
worker_pool,
ekp_kwargs...,
)
end

experiment_dir = joinpath(pkgdir(ClimaCalibrate), "experiments", "surface_fluxes_perfect_model")
model_interface = joinpath(experiment_dir, "model_interface.jl")
function calibrate(
b::Type{WorkerBackend},
ekp::EKP.EnsembleKalmanProcess,
ensemble_size,
n_iterations,
observations,
noise,
prior,
output_dir;
failure_rate = DEFAULT_FAILURE_RATE,
worker_pool = default_worker_pool(),
)
initialize(ekp, prior, output_dir)
for iter in 0:n_iterations
(; time) = @timed run_worker_iteration(
iter,
ensemble_size,
output_dir;
worker_pool,
failure_rate,
)
@info "Iteration $iter time: $time"
# Process results
G_ensemble = observation_map(iter)
save_G_ensemble(output_dir, iter, G_ensemble)
update_ensemble(output_dir, iter, prior)
iter_path = path_to_iteration(output_dir, iter)
end
return JLD2.load_object(
joinpath(path_to_iteration(output_dir, n_iterations), "eki_file.jld2"),
)
end

# Generate observational data and load interface
include(joinpath(experiment_dir, "generate_data.jl"))
include(joinpath(experiment_dir, "observation_map.jl"))
include(model_interface)
function calibrate(
b::Type{<:HPCBackend},
config::ExperimentConfig;
experiment_dir = project_dir(),
model_interface = abspath(
joinpath(experiment_dir, "..", "..", "model_interface.jl"),
),
verbose = false,
hpc_kwargs = Dict(),
ekp_kwargs...,
)
(; ensemble_size, n_iterations, observations, noise, prior, output_dir) =
config
return calibrate(
b,
ensemble_size,
n_iterations,
observations,
noise,
prior,
output_dir;
model_interface,
verbose,
hpc_kwargs,
ekp_kwargs...,
)
end

hpc_kwargs = kwargs(time = 3)
backend = get_backend()
eki = calibrate(backend, experiment_dir; model_interface, hpc_kwargs);
```
"""
function calibrate(
b::Type{<:HPCBackend},
experiment_dir::AbstractString;
ensemble_size::Int,
n_iterations::Int,
observations,
noise,
prior,
output_dir;
experiment_dir = project_dir(),
model_interface = abspath(
joinpath(experiment_dir, "..", "..", "model_interface.jl"),
),
verbose = false,
hpc_kwargs,
ekp_kwargs...,
)
calibrate(b, ExperimentConfig(experiment_dir); hpc_kwargs, ekp_kwargs...)
ekp = ekp_constructor(ensemble_size, prior, observations, noise)
return calibrate(
b,
ekp,
ensemble_size,
n_iterations,
observations,
noise,
prior,
output_dir;
experiment_dir,
model_interface,
verbose,
hpc_kwargs,
ekp_kwargs...,
)
end

function calibrate(
b::Type{<:HPCBackend},
config::ExperimentConfig;
experiment_dir = dirname(Base.active_project()),
ekp::EKP.EnsembleKalmanProcess,
ensemble_size,
n_iterations,
observations,
noise,
prior,
output_dir;
experiment_dir = project_dir(),
model_interface = abspath(
joinpath(experiment_dir, "..", "..", "model_interface.jl"),
),
verbose = false,
reruns = 1,
ekp = nothing,
hpc_kwargs,
ekp_kwargs...,
)
(; n_iterations, output_dir, prior, ensemble_size) = config
@info "Initializing calibration" n_iterations ensemble_size output_dir

ekp = if ekp isa EKP.EnsembleKalmanProcess
initialize(ekp, prior, output_dir)
else
initialize(config; ekp_kwargs...)
end
initialize(ekp, prior, output_dir)
module_load_str = module_load_string(b)
for i in 0:(n_iterations - 1)
@info "Iteration $i"
Expand All @@ -206,12 +341,12 @@ function calibrate(
module_load_str;
hpc_kwargs,
verbose,
reruns,
reruns = 0,
)
@info "Completed iteration $i, updating ensemble"
G_ensemble = observation_map(i)
save_G_ensemble(config, i, G_ensemble)
terminate = update_ensemble(config, i)
save_G_ensemble(output_dir, i, G_ensemble)
terminate = update_ensemble(output_dir, i, prior)
!isnothing(terminate) && break
iter_path = path_to_iteration(output_dir, i + 1)
ekp = JLD2.load_object(joinpath(iter_path, "eki_file.jld2"))
Expand All @@ -223,7 +358,7 @@ end
# Scheduler interfaces should not depend on backend struct
"""
model_run(backend, iter, member, output_dir, experiment_dir; model_interface, verbose, hpc_kwargs)
Construct and execute a command to run a single forward model on a given job scheduler.
Dispatches on `backend` to run [`slurm_model_run`](@ref) or [`pbs_model_run`](@ref).
Expand All @@ -232,8 +367,8 @@ Arguments:
- iter: Iteration number
- member: Member number
- output_dir: Calibration experiment output directory
- experiment_dir: Directory containing the experiment's Project.toml
- model_interface: File containing the model interface
- project_dir: Directory containing the experiment's Project.toml
- model_interface: Model interface file
- module_load_str: Commands which load the necessary modules
- hpc_kwargs: Dictionary containing the resources for the job. Easily generated using [`kwargs`](@ref).
"""
Expand All @@ -242,15 +377,15 @@ model_run(
iter,
member,
output_dir,
experiment_dir,
project_dir,
model_interface,
module_load_str;
hpc_kwargs,
) = slurm_model_run(
iter,
member,
output_dir,
experiment_dir,
project_dir,
model_interface,
module_load_str;
hpc_kwargs,
Expand All @@ -260,15 +395,15 @@ model_run(
iter,
member,
output_dir,
experiment_dir,
project_dir,
model_interface,
module_load_str;
hpc_kwargs,
) = pbs_model_run(
iter,
member,
output_dir,
experiment_dir,
project_dir,
model_interface,
module_load_str;
hpc_kwargs,
Expand Down
Loading

0 comments on commit c3f6707

Please sign in to comment.