Skip to content

Commit

Permalink
Working with ClimaAtmos, SF calibration is broken
Browse files Browse the repository at this point in the history
  • Loading branch information
nefrathenrici committed Dec 10, 2024
1 parent 99db662 commit 884b815
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 47 deletions.
6 changes: 2 additions & 4 deletions experiments/surface_fluxes_perfect_model/generate_data.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import SurfaceFluxes.Parameters: SurfaceFluxesParameters
using ClimaCalibrate

pkg_dir = pkgdir(ClimaCalibrate)
experiment_path =
joinpath(pkg_dir, "experiments", "surface_fluxes_perfect_model")
experiment_path = dirname(Base.active_project())
data_path = joinpath(experiment_path, "data")
include(joinpath(experiment_path, "model_interface.jl"))

Expand Down Expand Up @@ -125,5 +124,4 @@ JLD2.save_object(
ustar =
JLD2.load_object(joinpath(data_path, "synthetic_ustar_array_noisy.jld2"))
(; observation, variance) = process_member_data(ustar; output_variance = true)
JLD2.save_object(joinpath(data_path, "obs_mean.jld2"), observation)
JLD2.save_object(joinpath(data_path, "obs_noise_cov.jld2"), variance)
nothing
18 changes: 9 additions & 9 deletions experiments/surface_fluxes_perfect_model/postprocessing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ using Statistics
using ClimaCalibrate

experiment_dir = dirname(Base.active_project())
experiment_config = ClimaCalibrate.ExperimentConfig(experiment_dir)
output_dir = experiment_config.output_dir
N_iter = experiment_config.n_iterations
N_mem = experiment_config.ensemble_size
N_iter = n_iterations
N_mem = ensemble_size

function convergence_plot(
eki,
prior,
theta_star_vec,
param_names,
output_dir = experiment_config.output_dir,
output_dir = output_dir,
)

# per parameter
Expand Down Expand Up @@ -102,15 +100,13 @@ end


pkg_dir = pkgdir(ClimaCalibrate)
model_config = YAML.load_file(joinpath(experiment_dir, "model_config.yml"))

eki_path = joinpath(
ClimaCalibrate.path_to_iteration(output_dir, N_iter),
"eki_file.jld2",
);
eki = JLD2.load_object(eki_path);
EKP.get_u(eki)
prior = experiment_config.prior

theta_star_vec =
(; coefficient_a_m_businger = 4.7, coefficient_a_h_businger = 4.7)
Expand All @@ -130,11 +126,15 @@ include(joinpath(experiment_dir, "model_interface.jl"))
f = Makie.Figure()
ax = Makie.Axis(f[1, 1], xlabel = "Iteration", ylabel = "Model Ustar")
ustar_obs = JLD2.load_object(
joinpath(pkg_dir, "$experiment_dir/data/synthetic_ustar_array_noisy.jld2"),
joinpath(experiment_dir, "data", "synthetic_ustar_array_noisy.jld2"),
)
x_inputs = load_profiles(model_config["x_data_file"])

x_data_file = joinpath(pkgdir_CC, "experiments", "surface_fluxes_perfect_model", "data", "synthetic_profile_data.jld2")
x_inputs = load_profiles(x_data_file)

ustar_mod = 0
model_config = Dict()
model_config["output_dir"] = output_dir
for iter in 0:N_iter
for i in 1:N_mem
model_config["toml"] = [
Expand Down
2 changes: 2 additions & 0 deletions src/ClimaCalibrate.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module ClimaCalibrate

project_dir() = dirname(Base.active_project())

include("ekp_interface.jl")
include("model_interface.jl")
include("slurm.jl")
Expand Down
6 changes: 3 additions & 3 deletions src/ekp_interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ end
"""
path_to_ensemble_member(output_dir, iteration, member)
Constructs the path to an ensemble member's directory for a given iteration and member number.
Return the path to an ensemble member's directory for a given iteration and member number.
"""
path_to_ensemble_member(output_dir, iteration, member) =
EKP.TOMLInterface.path_to_ensemble_member(output_dir, iteration, member)

"""
path_to_model_log(output_dir, iteration, member)
Constructs the path to an ensemble member's forward model log for a given iteration and member number.
Return the path to an ensemble member's forward model log for a given iteration and member number.
"""
path_to_model_log(output_dir, iteration, member) = joinpath(
path_to_ensemble_member(output_dir, iteration, member),
Expand All @@ -100,7 +100,7 @@ path_to_model_log(output_dir, iteration, member) = joinpath(
"""
path_to_iteration(output_dir, iteration)
Creates the path to the directory for a specific iteration within the specified output directory.
Return the path to the directory for a given iteration within the specified output directory.
"""
path_to_iteration(output_dir, iteration) =
joinpath(output_dir, join(["iteration", lpad(iteration, 3, "0")], "_"))
Expand Down
29 changes: 17 additions & 12 deletions src/slurm_workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ using Distributed
import EnsembleKalmanProcesses as EKP
export worker_calibrate, add_slurm_workers

function run_iteration(iter, ensemble_size, output_dir; worker_pool = default_worker_pool(), failure_rate = 0.5)
function run_iteration(iter, ensemble_size, output_dir; worker_pool, failure_rate)
# Create a channel to collect results
results = Channel{Any}(ensemble_size)
nfailures = 0
Expand All @@ -12,9 +12,9 @@ function run_iteration(iter, ensemble_size, output_dir; worker_pool = default_wo
worker = take!(worker_pool)
@info "Running particle $m on worker $worker"
try
remotecall( forward_model, worker, m, iter)
remotecall_wait(forward_model, worker, iter, m)
catch e
@error "Error running member $m" exception = e
@warn "Error running member $m" exception = e
nfailures += 1
finally
# Always return worker to pool
Expand Down Expand Up @@ -45,15 +45,15 @@ function worker_calibrate(ensemble_size, n_iterations, observations, noise, prio
ekp_kwargs...,
)
for iter in 0:(n_iterations)
(; time) = @timed run_iteration(iter, config; worker_pool, failure_rate)
(; time) = @timed run_iteration(iter, ensemble_size, output_dir; worker_pool, failure_rate)
@info "Iteration $iter time: $time"
# Process results
G_ensemble = observation_map(iter)
save_G_ensemble(output_dir, iter, G_ensemble)
update_ensemble(output_dir, iter, prior)
iter_path = path_to_iteration(output_dir, iter)
end
return JLD2.load_object(joinpath(path_to_iteration(output_dir, n_iterations)), "eki_file.jld2")
return JLD2.load_object(joinpath(path_to_iteration(output_dir, n_iterations), "eki_file.jld2"))
end

function worker_calibrate(ekp::EKP.EnsembleKalmanProcess, ensemble_size,n_iterations, observations, noise, prior, output_dir; failure_rate = 0.5, worker_pool = default_worker_pool(), ekp_kwargs...)
Expand Down Expand Up @@ -86,13 +86,17 @@ function Distributed.manage(manager::SlurmManager, id::Integer, config::WorkerCo
# This function needs to exist, but so far we don't do anything
end

# Main SlurmManager function, mostly copied from the unmaintained ClusterManagers.jl
# Original code: https://github.com/JuliaParallel/ClusterManagers.jl
function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array,
c::Condition)
default_params = Distributed.default_addprocs_params()
params = merge(default_params, Dict{Symbol, Any}(params))
exehome = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]

exeflags = exeflags == `` ? "--project=$(project_dir())" : exeflags

stdkeys = keys(Distributed.default_addprocs_params())
slurm_params = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params)
Expand Down Expand Up @@ -144,7 +148,8 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array,

@info "Starting SLURM job $jobname: $srun_cmd"
srun_proc = open(srun_cmd)

# This Regex will match the worker's socket and IP address
# Example: julia_worker:9015#169.254.3.1
slurm_spec_regex = r"([\w]+):([\d]+)#(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})"
could_not_connect_regex = r"could not connect"
exiting_regex = r"exiting."
Expand All @@ -154,7 +159,7 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array,
t_waited = round(Int, time() - t_start)
retry_delays = ExponentialBackOff(10, 1.0, 512.0, 2.0, 0.1)
for i in 0:ntasks - 1
slurm_spec_match::Union{RegexMatch,Nothing} = nothing
slurm_spec_match = nothing
worker_errors = String[]
if !has_output_name
job_output_file = default_output(lpad(i, 4, "0"))
Expand All @@ -163,16 +168,16 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array,
t_waited = round(Int, time() - t_start)

# Wait for output log to be created and populated, then parse

if isfile(job_output_file)
if filesize(job_output_file) > 0
open(job_output_file) do f
# Due to error and warning messages, the specification
# may not appear on the file's first line
# Due to error and warning messages, we need to check
# for a regex match on each line
for line in eachline(f)
re_match = match(slurm_spec_regex, line)
if !isnothing(re_match)
slurm_spec_match = re_match
break # We have found the match
end
for expr in [could_not_connect_regex, exiting_regex]
if !isnothing(match(expr, line))
Expand All @@ -197,9 +202,9 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array,
end

if !isempty(worker_errors)
throw(SlurmException("Worker $i failed after $t_waited s: $(join(worker_errors, " "))"))
throw(ErrorException("Worker $i failed after $t_waited s: $(join(worker_errors, " "))"))
elseif isnothing(slurm_spec_match)
throw(SlurmException("Timeout after $t_waited s while waiting for worker $i to get ready."))
throw(ErrorException("Timeout after $t_waited s while waiting for worker $i to get ready."))
end

config = WorkerConfig()
Expand Down
36 changes: 17 additions & 19 deletions test/slurm_workers.jl
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
# Tests for SurfaceFluxes example calibration on HPC, used in buildkite testing
# To run, open the REPL: julia --project=experiments/surface_fluxes_perfect_model test/hpc_backend_e2e.jl

using Pkg
Pkg.instantiate(; verbose = true)

import ClimaCalibrate:
get_backend,
HPCBackend,
JuliaBackend,
calibrate,
get_prior,
kwargs,
ExperimentConfig,
DerechoBackend
using ClimaCalibrate
using Distributed
using Test
import EnsembleKalmanProcesses: get_ϕ_mean_final, get_g_mean_final
Expand All @@ -33,21 +22,30 @@ function test_sf_calibration_output(eki, prior)
end
end

experiment_dir = dirname(Base.active_project())
addprocs(ClimaCalibrate.SlurmManager(10); exeflags="--project=$(dirname(Base.active_project()))")
include(joinpath(experiment_dir, "generate_data.jl"))

@everywhere begin
using ClimaCalibrate
experiment_dir = dirname(Base.active_project())
model_interface = joinpath(experiment_dir, "model_interface.jl")
output_dir = joinpath("output", "surface_fluxes_perfect_model")
prior = get_prior(joinpath(experiment_dir, "prior.toml"))
ensemble_size = 10
n_iterations = 6
end

# Generate observational data and include observational map
include(joinpath(experiment_dir, "generate_data.jl"))
@everywhere begin
include(joinpath(experiment_dir, "observation_map.jl"))
include(model_interface)

prior = get_prior(joinpath(experiment_dir, "prior.toml"))
ustar =
JLD2.load_object(joinpath(experiment_dir, "data", "synthetic_ustar_array_noisy.jld2"))
(; observation, variance) = process_member_data(ustar; output_variance = true)

model_interface = joinpath(experiment_dir, "model_interface.jl")
include(model_interface)
end
eki = worker_calibrate(config; model_interface, hpc_kwargs, verbose = true)

eki = worker_calibrate(ensemble_size, n_iterations, observation, variance, prior, output_dir)

test_sf_calibration_output(eki, prior)

Expand Down

0 comments on commit 884b815

Please sign in to comment.