diff --git a/experiments/surface_fluxes_perfect_model/generate_data.jl b/experiments/surface_fluxes_perfect_model/generate_data.jl index 42cb85ef..a35659ac 100644 --- a/experiments/surface_fluxes_perfect_model/generate_data.jl +++ b/experiments/surface_fluxes_perfect_model/generate_data.jl @@ -8,8 +8,7 @@ import SurfaceFluxes.Parameters: SurfaceFluxesParameters using ClimaCalibrate pkg_dir = pkgdir(ClimaCalibrate) -experiment_path = - joinpath(pkg_dir, "experiments", "surface_fluxes_perfect_model") +experiment_path = dirname(Base.active_project()) data_path = joinpath(experiment_path, "data") include(joinpath(experiment_path, "model_interface.jl")) @@ -125,5 +124,4 @@ JLD2.save_object( ustar = JLD2.load_object(joinpath(data_path, "synthetic_ustar_array_noisy.jld2")) (; observation, variance) = process_member_data(ustar; output_variance = true) -JLD2.save_object(joinpath(data_path, "obs_mean.jld2"), observation) -JLD2.save_object(joinpath(data_path, "obs_noise_cov.jld2"), variance) +nothing diff --git a/experiments/surface_fluxes_perfect_model/postprocessing.jl b/experiments/surface_fluxes_perfect_model/postprocessing.jl index d5b0ba19..2a9fbcc1 100644 --- a/experiments/surface_fluxes_perfect_model/postprocessing.jl +++ b/experiments/surface_fluxes_perfect_model/postprocessing.jl @@ -12,17 +12,15 @@ using Statistics using ClimaCalibrate experiment_dir = dirname(Base.active_project()) -experiment_config = ClimaCalibrate.ExperimentConfig(experiment_dir) -output_dir = experiment_config.output_dir -N_iter = experiment_config.n_iterations -N_mem = experiment_config.ensemble_size +N_iter = n_iterations +N_mem = ensemble_size function convergence_plot( eki, prior, theta_star_vec, param_names, - output_dir = experiment_config.output_dir, + output_dir = output_dir, ) # per parameter @@ -102,7 +100,6 @@ end pkg_dir = pkgdir(ClimaCalibrate) -model_config = YAML.load_file(joinpath(experiment_dir, "model_config.yml")) eki_path = joinpath( ClimaCalibrate.path_to_iteration(output_dir, N_iter), @@ -110,7 +107,6 @@ eki_path = joinpath( ); eki = JLD2.load_object(eki_path); EKP.get_u(eki) -prior = experiment_config.prior theta_star_vec = (; coefficient_a_m_businger = 4.7, coefficient_a_h_businger = 4.7) @@ -130,11 +126,15 @@ include(joinpath(experiment_dir, "model_interface.jl")) f = Makie.Figure() ax = Makie.Axis(f[1, 1], xlabel = "Iteration", ylabel = "Model Ustar") ustar_obs = JLD2.load_object( - joinpath(pkg_dir, "$experiment_dir/data/synthetic_ustar_array_noisy.jld2"), + joinpath(experiment_dir, "data", "synthetic_ustar_array_noisy.jld2"), ) -x_inputs = load_profiles(model_config["x_data_file"]) + +x_data_file = joinpath(pkgdir_CC, "experiments", "surface_fluxes_perfect_model", "data", "synthetic_profile_data.jld2") +x_inputs = load_profiles(x_data_file) ustar_mod = 0 +model_config = Dict() +model_config["output_dir"] = output_dir for iter in 0:N_iter for i in 1:N_mem model_config["toml"] = [ diff --git a/src/ClimaCalibrate.jl b/src/ClimaCalibrate.jl index 6ef631c9..bff9697a 100644 --- a/src/ClimaCalibrate.jl +++ b/src/ClimaCalibrate.jl @@ -1,5 +1,7 @@ module ClimaCalibrate +project_dir() = dirname(Base.active_project()) + include("ekp_interface.jl") include("model_interface.jl") include("slurm.jl") diff --git a/src/ekp_interface.jl b/src/ekp_interface.jl index 6a99016f..1bcbf7df 100644 --- a/src/ekp_interface.jl +++ b/src/ekp_interface.jl @@ -82,7 +82,7 @@ end """ path_to_ensemble_member(output_dir, iteration, member) -Constructs the path to an ensemble member's directory for a given iteration and member number. +Return the path to an ensemble member's directory for a given iteration and member number. """ path_to_ensemble_member(output_dir, iteration, member) = EKP.TOMLInterface.path_to_ensemble_member(output_dir, iteration, member) @@ -90,7 +90,7 @@ path_to_ensemble_member(output_dir, iteration, member) = """ path_to_model_log(output_dir, iteration, member) -Constructs the path to an ensemble member's forward model log for a given iteration and member number. +Return the path to an ensemble member's forward model log for a given iteration and member number. """ path_to_model_log(output_dir, iteration, member) = joinpath( path_to_ensemble_member(output_dir, iteration, member), @@ -100,7 +100,7 @@ path_to_model_log(output_dir, iteration, member) = joinpath( """ path_to_iteration(output_dir, iteration) -Creates the path to the directory for a specific iteration within the specified output directory. +Return the path to the directory for a given iteration within the specified output directory. """ path_to_iteration(output_dir, iteration) = joinpath(output_dir, join(["iteration", lpad(iteration, 3, "0")], "_")) diff --git a/src/slurm_workers.jl b/src/slurm_workers.jl index 5ce5e9be..e043586e 100644 --- a/src/slurm_workers.jl +++ b/src/slurm_workers.jl @@ -2,7 +2,7 @@ using Distributed import EnsembleKalmanProcesses as EKP export worker_calibrate, add_slurm_workers -function run_iteration(iter, ensemble_size, output_dir; worker_pool = default_worker_pool(), failure_rate = 0.5) +function run_iteration(iter, ensemble_size, output_dir; worker_pool, failure_rate) # Create a channel to collect results results = Channel{Any}(ensemble_size) nfailures = 0 @@ -12,9 +12,9 @@ function run_iteration(iter, ensemble_size, output_dir; worker_pool = default_wo worker = take!(worker_pool) @info "Running particle $m on worker $worker" try - remotecall( forward_model, worker, m, iter) + remotecall_wait(forward_model, worker, iter, m) catch e - @error "Error running member $m" exception = e + @warn "Error running member $m" exception = e nfailures += 1 finally # Always return worker to pool @@ -45,7 +45,7 @@ function worker_calibrate(ensemble_size, n_iterations, observations, noise, prio ekp_kwargs..., ) for iter in 0:(n_iterations) - (; time) = @timed run_iteration(iter, config; worker_pool, failure_rate) + (; time) = @timed run_iteration(iter, ensemble_size, output_dir; worker_pool, failure_rate) @info "Iteration $iter time: $time" # Process results G_ensemble = observation_map(iter) @@ -53,7 +53,7 @@ function worker_calibrate(ensemble_size, n_iterations, observations, noise, prio update_ensemble(output_dir, iter, prior) iter_path = path_to_iteration(output_dir, iter) end - return JLD2.load_object(joinpath(path_to_iteration(output_dir, n_iterations)), "eki_file.jld2") + return JLD2.load_object(joinpath(path_to_iteration(output_dir, n_iterations), "eki_file.jld2")) end function worker_calibrate(ekp::EKP.EnsembleKalmanProcess, ensemble_size,n_iterations, observations, noise, prior, output_dir; failure_rate = 0.5, worker_pool = default_worker_pool(), ekp_kwargs...) @@ -86,6 +86,8 @@ function Distributed.manage(manager::SlurmManager, id::Integer, config::WorkerCo # This function needs to exist, but so far we don't do anything end +# Main SlurmManager function, mostly copied from the unmaintained ClusterManagers.jl +# Original code: https://github.com/JuliaParallel/ClusterManagers.jl function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array, c::Condition) default_params = Distributed.default_addprocs_params() @@ -93,6 +95,8 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array, exehome = params[:dir] exename = params[:exename] exeflags = params[:exeflags] + + exeflags = exeflags == `` ? "--project=$(project_dir())" : exeflags stdkeys = keys(Distributed.default_addprocs_params()) slurm_params = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params) @@ -144,7 +148,8 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array, @info "Starting SLURM job $jobname: $srun_cmd" srun_proc = open(srun_cmd) - + # This Regex will match the worker's socket and IP address + # Example: julia_worker:9015#169.254.3.1 slurm_spec_regex = r"([\w]+):([\d]+)#(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})" could_not_connect_regex = r"could not connect" exiting_regex = r"exiting." @@ -154,7 +159,7 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array, t_waited = round(Int, time() - t_start) retry_delays = ExponentialBackOff(10, 1.0, 512.0, 2.0, 0.1) for i in 0:ntasks - 1 - slurm_spec_match::Union{RegexMatch,Nothing} = nothing + slurm_spec_match = nothing worker_errors = String[] if !has_output_name job_output_file = default_output(lpad(i, 4, "0")) @@ -163,16 +168,16 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array, t_waited = round(Int, time() - t_start) # Wait for output log to be created and populated, then parse - if isfile(job_output_file) if filesize(job_output_file) > 0 open(job_output_file) do f - # Due to error and warning messages, the specification - # may not appear on the file's first line + # Due to error and warning messages, we need to check + # for a regex match on each line for line in eachline(f) re_match = match(slurm_spec_regex, line) if !isnothing(re_match) slurm_spec_match = re_match + break # We have found the match end for expr in [could_not_connect_regex, exiting_regex] if !isnothing(match(expr, line)) @@ -197,9 +202,9 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array, end if !isempty(worker_errors) - throw(SlurmException("Worker $i failed after $t_waited s: $(join(worker_errors, " "))")) + throw(ErrorException("Worker $i failed after $t_waited s: $(join(worker_errors, " "))")) elseif isnothing(slurm_spec_match) - throw(SlurmException("Timeout after $t_waited s while waiting for worker $i to get ready.")) + throw(ErrorException("Timeout after $t_waited s while waiting for worker $i to get ready.")) end config = WorkerConfig() diff --git a/test/slurm_workers.jl b/test/slurm_workers.jl index 9242a48d..18ec57d6 100644 --- a/test/slurm_workers.jl +++ b/test/slurm_workers.jl @@ -1,18 +1,7 @@ # Tests for SurfaceFluxes example calibration on HPC, used in buildkite testing # To run, open the REPL: julia --project=experiments/surface_fluxes_perfect_model test/hpc_backend_e2e.jl -using Pkg -Pkg.instantiate(; verbose = true) - -import ClimaCalibrate: - get_backend, - HPCBackend, - JuliaBackend, - calibrate, - get_prior, - kwargs, - ExperimentConfig, - DerechoBackend +using ClimaCalibrate using Distributed using Test import EnsembleKalmanProcesses: get_ϕ_mean_final, get_g_mean_final @@ -33,21 +22,30 @@ function test_sf_calibration_output(eki, prior) end end +experiment_dir = dirname(Base.active_project()) addprocs(ClimaCalibrate.SlurmManager(10); exeflags="--project=$(dirname(Base.active_project()))") +include(joinpath(experiment_dir, "generate_data.jl")) @everywhere begin + using ClimaCalibrate experiment_dir = dirname(Base.active_project()) - model_interface = joinpath(experiment_dir, "model_interface.jl") + output_dir = joinpath("output", "surface_fluxes_perfect_model") + prior = get_prior(joinpath(experiment_dir, "prior.toml")) + ensemble_size = 10 + n_iterations = 6 +end - # Generate observational data and include observational map - include(joinpath(experiment_dir, "generate_data.jl")) +@everywhere begin include(joinpath(experiment_dir, "observation_map.jl")) - include(model_interface) - - prior = get_prior(joinpath(experiment_dir, "prior.toml")) + ustar = + JLD2.load_object(joinpath(experiment_dir, "data", "synthetic_ustar_array_noisy.jld2")) + (; observation, variance) = process_member_data(ustar; output_variance = true) + model_interface = joinpath(experiment_dir, "model_interface.jl") + include(model_interface) end -eki = worker_calibrate(config; model_interface, hpc_kwargs, verbose = true) + +eki = worker_calibrate(ensemble_size, n_iterations, observation, variance, prior, output_dir) test_sf_calibration_output(eki, prior)