Skip to content

Commit

Permalink
Comments and multithreading
Browse files Browse the repository at this point in the history
  • Loading branch information
nefrathenrici committed Mar 14, 2024
1 parent 26b80c7 commit 1f0a02f
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 16 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ jobs:
arch: ${{ matrix.arch }}
- uses: julia-actions/julia-buildpkg@v1
- name: Surface Fluxes Perfect Model Test
run:
run: |
julia --project=experiments/surface_fluxes_perfect_model -e 'using Pkg; Pkg.instantiate(;verbose=true)'
- run: |
julia --project=experiments/surface_fluxes_perfect_model test/test_sf.jl
julia -t 10 --project=experiments/surface_fluxes_perfect_model test/test_sf.jl
- uses: julia-actions/julia-runtest@v1
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v3
Expand Down
2 changes: 1 addition & 1 deletion experiments/pipeline.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ using Pkg
experiment_id = ARGS[1]
Pkg.activate("experiments/$experiment_id")
Pkg.instantiate()
import CalibrateAtmos
using CalibrateAtmos
pkg_dir = pkgdir(CalibrateAtmos)
experiment_path = "$pkg_dir/experiments/$experiment_id"
include("$experiment_path/model_interface.jl")
Expand Down
14 changes: 12 additions & 2 deletions experiments/surface_fluxes_perfect_model/model_interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,18 @@ end
Runs the model with the given an AbstractDict object.
"""

function run_forward_model(::SurfaceFluxModel, config::AbstractDict)
x_inputs = load_profiles(config["x_data_file"])
function run_forward_model(
::SurfaceFluxModel,
config::AbstractDict;
lk = nothing,
)
x_inputs = if isnothing(lk)
load_profiles(config["x_data_file"])
else
lock(lk) do
load_profiles(config["x_data_file"])
end
end
FT = typeof(x_inputs.profiles_int[1].T)
obtain_ustar(FT, x_inputs, config)
end
9 changes: 5 additions & 4 deletions src/ekp_interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ using Distributions
import EnsembleKalmanProcesses as EKP
using EnsembleKalmanProcesses.ParameterDistributions
using EnsembleKalmanProcesses.TOMLInterface
import ClimaComms

"""
path_to_iteration(output_dir, iteration)
Expand Down Expand Up @@ -152,7 +153,7 @@ include(joinpath(experiment_path, "generate_truth.jl"))
eki = CalibrateAtmos.calibrate(experiment_id)
```
"""
function calibrate(experiment_id)
function calibrate(experiment_id; device = ClimaComms.device())
ekp_config =
YAML.load_file(joinpath("experiments", experiment_id, "ekp_config.yml"))
# initialize the CalibrateAtmos
Expand All @@ -165,11 +166,11 @@ function calibrate(experiment_id)
for i in 0:(N_iter - 1)
# run G model to produce output from N_mem ensemble members
physical_model = get_forward_model(Val(Symbol(experiment_id)))

for m in 1:N_mem # TODO: parallelize with pmap!
lk = ReentrantLock()
ClimaComms.@threaded device for m in 1:N_mem
# model run for each ensemble member
model_config = get_config(physical_model, m, i, experiment_id)
run_forward_model(physical_model, model_config)
run_forward_model(physical_model, model_config; lk)
@info "Finished model run for member $m at iteration $i"
end

Expand Down
7 changes: 0 additions & 7 deletions test/ekp.jl → test/test_emulate_sample.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,6 @@ using EnsembleKalmanProcesses.TOMLInterface

import CalibrateAtmos as CAL

@testset "Test `calibrate`" begin
dir = pwd()
cd(pkgdir(CAL))
@test_throws ErrorException CAL.calibrate("surface_fluxes_perfect_model")
cd(dir)
end

@testset "Emulate and Sample tests" begin
y_obs = [261.5493]
y_noise_cov = [0.02619;;]
Expand Down
8 changes: 8 additions & 0 deletions test/test_model_interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,11 @@ end
"observation_map not implemented for experiment Val{:test}() at iteration 1",
) CalibrateAtmos.observation_map(Val(:test), 1)
end

# This test depends on `surface_fluxes_perfect_model` in the `experiments/` folder
@testset "Test `calibrate`" begin
dir = pwd()
cd(pkgdir(CalibrateAtmos))
@test_throws ErrorException CAL.calibrate("surface_fluxes_perfect_model")
cd(dir)
end

0 comments on commit 1f0a02f

Please sign in to comment.