Skip to content

Commit

Permalink
Working decently, need to clean, test, document
Browse files Browse the repository at this point in the history
  • Loading branch information
nefrathenrici committed Dec 10, 2024
1 parent 4836415 commit 99db662
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 97 deletions.
4 changes: 2 additions & 2 deletions experiments/surface_fluxes_perfect_model/generate_data.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import SurfaceFluxes as SF
import SurfaceFluxes.Parameters as SFPP
import SurfaceFluxes.UniversalFunctions as UF
import Thermodynamics as TD
using YAML
import SurfaceFluxes.Parameters: SurfaceFluxesParameters
using ClimaCalibrate

Expand Down Expand Up @@ -81,7 +80,8 @@ Generate synthetic observed y from the model truth.
"""
function synthetic_observed_y(x_inputs; data_path = "data", apply_noise = false)
FT = typeof(x_inputs.profiles_int[1].T)
config = YAML.load_file("$experiment_path/model_config.yml")
config = Dict()
config["toml"] = []
config["output_dir"] = data_path
y = obtain_ustar(FT, x_inputs, config, return_ustar = true)
if apply_noise
Expand Down
3 changes: 0 additions & 3 deletions experiments/surface_fluxes_perfect_model/model_config.yml

This file was deleted.

62 changes: 11 additions & 51 deletions experiments/surface_fluxes_perfect_model/model_interface.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import EnsembleKalmanProcesses as EKP
using ClimaCalibrate
import ClimaCalibrate: set_up_forward_model, run_forward_model, ExperimentConfig
import YAML
import ClimaCalibrate: forward_model

pkgdir_CC = pkgdir(ClimaCalibrate)

"""
SurfaceFluxModel
Expand Down Expand Up @@ -29,65 +30,24 @@ We need to follow the following steps for the calibration:
"""

experiment_dir = joinpath(
pkgdir(ClimaCalibrate),
pkgdir_CC,
"experiments",
"surface_fluxes_perfect_model",
)
include(joinpath(experiment_dir, "sf_model.jl"))
include(joinpath(experiment_dir, "observation_map.jl"))

function set_up_forward_model(member, iteration, experiment_dir::AbstractString)
return set_up_forward_model(
member,
iteration,
ExperimentConfig(experiment_dir),
)
end

"""
set_up_forward_model(member, iteration, experiment_dir::AbstractString)
set_up_forward_model(member, iteration, experiment_config::ExperimentConfig)
Returns an config dictionary object for the given member and iteration.
Given an experiment dir, it will load the ExperimentConfig
This assumes that the config dictionary has the `output_dir` key.
"""
function set_up_forward_model(
member,
iteration,
experiment_config::ExperimentConfig,
)
function forward_model(iteration, member)
# Specify member path for output_dir
model_config = YAML.load_file(
joinpath(
"experiments",
"surface_fluxes_perfect_model",
"model_config.yml",
),
)
output_dir = (experiment_config.output_dir)
model_config = Dict()
output_dir = joinpath(pkgdir_CC, "output", "surface_fluxes_perfect_model")
# Set TOML to use EKP parameter(s)
member_path =
EKP.TOMLInterface.path_to_ensemble_member(output_dir, iteration, member)
model_config["output_dir"] = member_path
parameter_path = joinpath(member_path, "parameters.toml")
if haskey(model_config, "toml")
push!(model_config["toml"], parameter_path)
else
model_config["toml"] = [parameter_path]
end

return model_config
end

"""
run_forward_model(config::AbstractDict)
Runs the model with the given an AbstractDict object.
"""

function run_forward_model(config::AbstractDict)
x_inputs = load_profiles(config["x_data_file"])
model_config["toml"] = [joinpath(member_path, "parameters.toml")]
x_data_file = joinpath(pkgdir_CC, "experiments", "surface_fluxes_perfect_model", "data", "synthetic_profile_data.jld2")
x_inputs = load_profiles(x_data_file)
FT = typeof(x_inputs.profiles_int[1].T)
obtain_ustar(FT, x_inputs, config)
obtain_ustar(FT, x_inputs, model_config)
end
4 changes: 2 additions & 2 deletions src/model_interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import YAML
export forward_model, observation_map

"""
forward_model(member, iteration)
forward_model(iteration, member)
Execute the forward model simulation with the given configuration.
This function must be overridden by a component's model interface and
should set things like the parameter path and other member-specific settings.
"""
function forward_model(member, iteration)
function forward_model(iteration, member)
error("forward_model not implemented")
end

Expand Down
44 changes: 19 additions & 25 deletions src/slurm_workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,56 +95,50 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array,
exeflags = params[:exeflags]

stdkeys = keys(Distributed.default_addprocs_params())

slurm_params = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params)

srunargs = []

for k in keys(slurm_params)
for (k, v) in slurm_params
if length(string(k)) == 1
push!(srunargs, "-$k")
val = p[k]
if length(val) > 0
push!(srunargs, "$(p[k])")
if length(v) > 0
push!(srunargs, v)
end
else
k2 = replace(string(k), "_"=>"-")
val = p[k]
if length(val) > 0
push!(srunargs, "--$(k2)=$(p[k])")
if length(v) > 0
push!(srunargs, "--$k2=$v)")
else
push!(srunargs, "--$(k2)")
push!(srunargs, "--$k2")
end
end
end
# Get job file location from parameter dictionary.

# Get job file location from parameter dictionary
job_file_loc = joinpath(exehome, get(params, :job_file_loc, "."))

# Make directory if not already made.
# Make directory if not already made
if !isdir(job_file_loc)
mkdir(job_file_loc)
end

# Check for given output file name
jobname = "julia-$(getpid())"

default_template = ".$jobname-$(trunc(Int, Base.time() * 10))"
default_output(x) = "$default_template-$x.out"
default_output(x) = joinpath(job_file_loc, "$default_template-$x.out")

# Set output name
has_output_name = ("-o" in srunargs) | ("--output" in srunargs)
job_output_file = if has_output_name
has_output_name = any(arg -> occursin("-o", arg) || occursin("--output", arg), srunargs)
if has_output_name
# if has_output_name, ensure there is only one output arg
loc = findfirst(x-> x == "-o" || x == "--output", srunargs)
job_output = srunargs[loc+1]
# Remove output argument to reappend
filter!(x -> x != "-o" && x != "--output", srunargs)
filter!(x -> !occursin(r"^-[oe]", x), srunargs)
job_output
locs = findall(x -> startswith(x, "-o") || startswith(x, "--output"), srunargs)
length(locs) > 1 && error("Slurm Error: Multiple output files specified: $srunargs")
job_output_file = srunargs[locs[1]+1]
else
# Slurm interpolates %4t to the task ID padded with up to four zeros
default_output("%4t")
push!(srunargs, "-o", default_output("%4t"))
end
push!(srunargs, "-o", job_output_file)

ntasks = sm.ntasks
srun_cmd = `srun -J $jobname -n $ntasks -D $exehome $(srunargs) $exename $exeflags $(worker_arg())`

Expand All @@ -163,7 +157,7 @@ function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array,
slurm_spec_match::Union{RegexMatch,Nothing} = nothing
worker_errors = String[]
if !has_output_name
job_output_file = default_output("0000")
job_output_file = default_output(lpad(i, 4, "0"))
end
for retry_delay in push!(collect(retry_delays), 0)
t_waited = round(Int, time() - t_start)
Expand Down
48 changes: 34 additions & 14 deletions test/slurm_workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,10 @@ import ClimaCalibrate:
kwargs,
ExperimentConfig,
DerechoBackend
using Distributed
using Test
import EnsembleKalmanProcesses: get_ϕ_mean_final, get_g_mean_final

experiment_dir = dirname(Base.active_project())
model_interface = joinpath(experiment_dir, "model_interface.jl")

# Generate observational data and include observational map
include(joinpath(experiment_dir, "generate_data.jl"))
include(joinpath(experiment_dir, "observation_map.jl"))
include(model_interface)

prior = get_prior(joinpath(experiment_dir, "prior.toml"))

function test_sf_calibration_output(eki, prior)
@testset "End to end test using file config (surface fluxes perfect model)" begin
parameter_values = get_ϕ_mean_final(prior, eki)
Expand All @@ -42,12 +33,41 @@ function test_sf_calibration_output(eki, prior)
end
end

@everywhere
addprocs(ClimaCalibrate.SlurmManager(10); exeflags="--project=$(dirname(Base.active_project()))")

@everywhere begin
experiment_dir = dirname(Base.active_project())
model_interface = joinpath(experiment_dir, "model_interface.jl")

# Generate observational data and include observational map
include(joinpath(experiment_dir, "generate_data.jl"))
include(joinpath(experiment_dir, "observation_map.jl"))
include(model_interface)

prior = get_prior(joinpath(experiment_dir, "prior.toml"))

end
eki = worker_calibrate(config; model_interface, hpc_kwargs, verbose = true)
test_sf_calibration_output(eki, prior)

# Pure Julia calibration, this should run anywhere
eki = calibrate(JuliaBackend, experiment_dir)
test_sf_calibration_output(eki, prior)

include(joinpath(experiment_dir, "postprocessing.jl"))

# Slurm Worker Unit Tests
@testset "Slurm Worker Unit Tests" begin
out_file = "my_slurm_job.out"
p = addprocs(ClimaCalibrate.SlurmManager(1); o=out_file)
@test nprocs() == 2
@test workers() == p
@test fetch(@spawnat :any myid()) == p[1]
@test remotecall_fetch(+,p[1],1,1) == 2
rmprocs(p)
@test nprocs() == 1
@test workers() == [1]

# Check output file creation
@test isfile(out_file)
rm(out_file)

@test_throws TaskFailedException p = addprocs(ClimaCalibrate.SlurmManager(1); o=out_file, output=out_file)
end

0 comments on commit 99db662

Please sign in to comment.