From 99db662d3d3bf67724835246b6190992e140128f Mon Sep 17 00:00:00 2001 From: nefrathenrici Date: Mon, 9 Dec 2024 16:11:10 -0800 Subject: [PATCH] Working decently, need to clean, test, document --- .../generate_data.jl | 4 +- .../model_config.yml | 3 - .../model_interface.jl | 62 ++++--------------- src/model_interface.jl | 4 +- src/slurm_workers.jl | 44 ++++++------- test/slurm_workers.jl | 48 +++++++++----- 6 files changed, 68 insertions(+), 97 deletions(-) delete mode 100644 experiments/surface_fluxes_perfect_model/model_config.yml diff --git a/experiments/surface_fluxes_perfect_model/generate_data.jl b/experiments/surface_fluxes_perfect_model/generate_data.jl index 4b7ccc17..42cb85ef 100644 --- a/experiments/surface_fluxes_perfect_model/generate_data.jl +++ b/experiments/surface_fluxes_perfect_model/generate_data.jl @@ -4,7 +4,6 @@ import SurfaceFluxes as SF import SurfaceFluxes.Parameters as SFPP import SurfaceFluxes.UniversalFunctions as UF import Thermodynamics as TD -using YAML import SurfaceFluxes.Parameters: SurfaceFluxesParameters using ClimaCalibrate @@ -81,7 +80,8 @@ Generate synthetic observed y from the model truth. """ function synthetic_observed_y(x_inputs; data_path = "data", apply_noise = false) FT = typeof(x_inputs.profiles_int[1].T) - config = YAML.load_file("$experiment_path/model_config.yml") + config = Dict() + config["toml"] = [] config["output_dir"] = data_path y = obtain_ustar(FT, x_inputs, config, return_ustar = true) if apply_noise diff --git a/experiments/surface_fluxes_perfect_model/model_config.yml b/experiments/surface_fluxes_perfect_model/model_config.yml deleted file mode 100644 index 23b8f1e7..00000000 --- a/experiments/surface_fluxes_perfect_model/model_config.yml +++ /dev/null @@ -1,3 +0,0 @@ -output_dir: output/surface_fluxes_perfect_model -x_data_file: experiments/surface_fluxes_perfect_model/data/synthetic_profile_data.jld2 -toml: [] \ No newline at end of file diff --git a/experiments/surface_fluxes_perfect_model/model_interface.jl b/experiments/surface_fluxes_perfect_model/model_interface.jl index 6c9806f1..0e7f53b7 100644 --- a/experiments/surface_fluxes_perfect_model/model_interface.jl +++ b/experiments/surface_fluxes_perfect_model/model_interface.jl @@ -1,7 +1,8 @@ import EnsembleKalmanProcesses as EKP using ClimaCalibrate -import ClimaCalibrate: set_up_forward_model, run_forward_model, ExperimentConfig -import YAML +import ClimaCalibrate: forward_model + +pkgdir_CC = pkgdir(ClimaCalibrate) """ SurfaceFluxModel @@ -29,65 +30,24 @@ We need to follow the following steps for the calibration: """ experiment_dir = joinpath( - pkgdir(ClimaCalibrate), + pkgdir_CC, "experiments", "surface_fluxes_perfect_model", ) include(joinpath(experiment_dir, "sf_model.jl")) include(joinpath(experiment_dir, "observation_map.jl")) -function set_up_forward_model(member, iteration, experiment_dir::AbstractString) - return set_up_forward_model( - member, - iteration, - ExperimentConfig(experiment_dir), - ) -end - -""" - set_up_forward_model(member, iteration, experiment_dir::AbstractString) - set_up_forward_model(member, iteration, experiment_config::ExperimentConfig) - -Returns an config dictionary object for the given member and iteration. -Given an experiment dir, it will load the ExperimentConfig -This assumes that the config dictionary has the `output_dir` key. -""" -function set_up_forward_model( - member, - iteration, - experiment_config::ExperimentConfig, -) +function forward_model(iteration, member) # Specify member path for output_dir - model_config = YAML.load_file( - joinpath( - "experiments", - "surface_fluxes_perfect_model", - "model_config.yml", - ), - ) - output_dir = (experiment_config.output_dir) + model_config = Dict() + output_dir = joinpath(pkgdir_CC, "output", "surface_fluxes_perfect_model") # Set TOML to use EKP parameter(s) member_path = EKP.TOMLInterface.path_to_ensemble_member(output_dir, iteration, member) model_config["output_dir"] = member_path - parameter_path = joinpath(member_path, "parameters.toml") - if haskey(model_config, "toml") - push!(model_config["toml"], parameter_path) - else - model_config["toml"] = [parameter_path] - end - - return model_config -end - -""" - run_forward_model(config::AbstractDict) - -Runs the model with the given an AbstractDict object. -""" - -function run_forward_model(config::AbstractDict) - x_inputs = load_profiles(config["x_data_file"]) + model_config["toml"] = [joinpath(member_path, "parameters.toml")] + x_data_file = joinpath(pkgdir_CC, "experiments", "surface_fluxes_perfect_model", "data", "synthetic_profile_data.jld2") + x_inputs = load_profiles(x_data_file) FT = typeof(x_inputs.profiles_int[1].T) - obtain_ustar(FT, x_inputs, config) + obtain_ustar(FT, x_inputs, model_config) end diff --git a/src/model_interface.jl b/src/model_interface.jl index 01e3f47d..a47a2ffd 100644 --- a/src/model_interface.jl +++ b/src/model_interface.jl @@ -4,14 +4,14 @@ import YAML export forward_model, observation_map """ - forward_model(member, iteration) + forward_model(iteration, member) Execute the forward model simulation with the given configuration. This function must be overridden by a component's model interface and should set things like the parameter path and other member-specific settings. """ -function forward_model(member, iteration) +function forward_model(iteration, member) error("forward_model not implemented") end diff --git a/src/slurm_workers.jl b/src/slurm_workers.jl index 8c0c60a2..5ce5e9be 100644 --- a/src/slurm_workers.jl +++ b/src/slurm_workers.jl @@ -95,56 +95,50 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array, exeflags = params[:exeflags] stdkeys = keys(Distributed.default_addprocs_params()) - slurm_params = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params) - srunargs = [] - for k in keys(slurm_params) + for (k, v) in slurm_params if length(string(k)) == 1 push!(srunargs, "-$k") - val = p[k] - if length(val) > 0 - push!(srunargs, "$(p[k])") + if length(v) > 0 + push!(srunargs, v) end else k2 = replace(string(k), "_"=>"-") - val = p[k] - if length(val) > 0 - push!(srunargs, "--$(k2)=$(p[k])") + if length(v) > 0 + push!(srunargs, "--$k2=$v)") else - push!(srunargs, "--$(k2)") + push!(srunargs, "--$k2") end end end - # Get job file location from parameter dictionary. + + # Get job file location from parameter dictionary job_file_loc = joinpath(exehome, get(params, :job_file_loc, ".")) - # Make directory if not already made. + # Make directory if not already made if !isdir(job_file_loc) mkdir(job_file_loc) end - # Check for given output file name jobname = "julia-$(getpid())" + default_template = ".$jobname-$(trunc(Int, Base.time() * 10))" - default_output(x) = "$default_template-$x.out" + default_output(x) = joinpath(job_file_loc, "$default_template-$x.out") # Set output name - has_output_name = ("-o" in srunargs) | ("--output" in srunargs) - job_output_file = if has_output_name + has_output_name = any(arg -> occursin("-o", arg) || occursin("--output", arg), srunargs) + if has_output_name # if has_output_name, ensure there is only one output arg - loc = findfirst(x-> x == "-o" || x == "--output", srunargs) - job_output = srunargs[loc+1] - # Remove output argument to reappend - filter!(x -> x != "-o" && x != "--output", srunargs) - filter!(x -> !occursin(r"^-[oe]", x), srunargs) - job_output + locs = findall(x -> startswith(x, "-o") || startswith(x, "--output"), srunargs) + length(locs) > 1 && error("Slurm Error: Multiple output files specified: $srunargs") + job_output_file = srunargs[locs[1]+1] else # Slurm interpolates %4t to the task ID padded with up to four zeros - default_output("%4t") + push!(srunargs, "-o", default_output("%4t")) end - push!(srunargs, "-o", job_output_file) + ntasks = sm.ntasks srun_cmd = `srun -J $jobname -n $ntasks -D $exehome $(srunargs) $exename $exeflags $(worker_arg())` @@ -163,7 +157,7 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array, slurm_spec_match::Union{RegexMatch,Nothing} = nothing worker_errors = String[] if !has_output_name - job_output_file = default_output("0000") + job_output_file = default_output(lpad(i, 4, "0")) end for retry_delay in push!(collect(retry_delays), 0) t_waited = round(Int, time() - t_start) diff --git a/test/slurm_workers.jl b/test/slurm_workers.jl index 38562a4c..9242a48d 100644 --- a/test/slurm_workers.jl +++ b/test/slurm_workers.jl @@ -13,19 +13,10 @@ import ClimaCalibrate: kwargs, ExperimentConfig, DerechoBackend +using Distributed using Test import EnsembleKalmanProcesses: get_ϕ_mean_final, get_g_mean_final -experiment_dir = dirname(Base.active_project()) -model_interface = joinpath(experiment_dir, "model_interface.jl") - -# Generate observational data and include observational map -include(joinpath(experiment_dir, "generate_data.jl")) -include(joinpath(experiment_dir, "observation_map.jl")) -include(model_interface) - -prior = get_prior(joinpath(experiment_dir, "prior.toml")) - function test_sf_calibration_output(eki, prior) @testset "End to end test using file config (surface fluxes perfect model)" begin parameter_values = get_ϕ_mean_final(prior, eki) @@ -42,12 +33,41 @@ function test_sf_calibration_output(eki, prior) end end -@everywhere +addprocs(ClimaCalibrate.SlurmManager(10); exeflags="--project=$(dirname(Base.active_project()))") + +@everywhere begin + experiment_dir = dirname(Base.active_project()) + model_interface = joinpath(experiment_dir, "model_interface.jl") + + # Generate observational data and include observational map + include(joinpath(experiment_dir, "generate_data.jl")) + include(joinpath(experiment_dir, "observation_map.jl")) + include(model_interface) + + prior = get_prior(joinpath(experiment_dir, "prior.toml")) + +end eki = worker_calibrate(config; model_interface, hpc_kwargs, verbose = true) -test_sf_calibration_output(eki, prior) -# Pure Julia calibration, this should run anywhere -eki = calibrate(JuliaBackend, experiment_dir) test_sf_calibration_output(eki, prior) include(joinpath(experiment_dir, "postprocessing.jl")) + +# Slurm Worker Unit Tests +@testset "Slurm Worker Unit Tests" begin + out_file = "my_slurm_job.out" + p = addprocs(ClimaCalibrate.SlurmManager(1); o=out_file) + @test nprocs() == 2 + @test workers() == p + @test fetch(@spawnat :any myid()) == p[1] + @test remotecall_fetch(+,p[1],1,1) == 2 + rmprocs(p) + @test nprocs() == 1 + @test workers() == [1] + + # Check output file creation + @test isfile(out_file) + rm(out_file) + + @test_throws TaskFailedException p = addprocs(ClimaCalibrate.SlurmManager(1); o=out_file, output=out_file) +end