From 1f0a02f098d791e5cfc56253ef4819c7f77cd108 Mon Sep 17 00:00:00 2001 From: nefrathenrici Date: Thu, 14 Mar 2024 11:57:47 -0700 Subject: [PATCH] Comments and multithreading --- .github/workflows/ci.yml | 4 ++-- experiments/pipeline.jl | 2 +- .../model_interface.jl | 14 ++++++++++++-- src/ekp_interface.jl | 9 +++++---- test/{ekp.jl => test_emulate_sample.jl} | 7 ------- test/test_model_interface.jl | 8 ++++++++ 6 files changed, 28 insertions(+), 16 deletions(-) rename test/{ekp.jl => test_emulate_sample.jl} (87%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a49e9ef4..bca121aa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -37,10 +37,10 @@ jobs: arch: ${{ matrix.arch }} - uses: julia-actions/julia-buildpkg@v1 - name: Surface Fluxes Perfect Model Test - run: + run: | julia --project=experiments/surface_fluxes_perfect_model -e 'using Pkg; Pkg.instantiate(;verbose=true)' - run: | - julia --project=experiments/surface_fluxes_perfect_model test/test_sf.jl + julia -t 10 --project=experiments/surface_fluxes_perfect_model test/test_sf.jl - uses: julia-actions/julia-runtest@v1 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v3 diff --git a/experiments/pipeline.jl b/experiments/pipeline.jl index 07d2f430..1f7880ad 100644 --- a/experiments/pipeline.jl +++ b/experiments/pipeline.jl @@ -4,7 +4,7 @@ using Pkg experiment_id = ARGS[1] Pkg.activate("experiments/$experiment_id") Pkg.instantiate() -import CalibrateAtmos +using CalibrateAtmos pkg_dir = pkgdir(CalibrateAtmos) experiment_path = "$pkg_dir/experiments/$experiment_id" include("$experiment_path/model_interface.jl") diff --git a/experiments/surface_fluxes_perfect_model/model_interface.jl b/experiments/surface_fluxes_perfect_model/model_interface.jl index e99dfad3..37dc3d30 100644 --- a/experiments/surface_fluxes_perfect_model/model_interface.jl +++ b/experiments/surface_fluxes_perfect_model/model_interface.jl @@ -84,8 +84,18 @@ end Runs the model with the given an AbstractDict object. """ -function run_forward_model(::SurfaceFluxModel, config::AbstractDict) - x_inputs = load_profiles(config["x_data_file"]) +function run_forward_model( + ::SurfaceFluxModel, + config::AbstractDict; + lk = nothing, +) + x_inputs = if isnothing(lk) + load_profiles(config["x_data_file"]) + else + lock(lk) do + load_profiles(config["x_data_file"]) + end + end FT = typeof(x_inputs.profiles_int[1].T) obtain_ustar(FT, x_inputs, config) end diff --git a/src/ekp_interface.jl b/src/ekp_interface.jl index 747c0024..2bad8d3d 100644 --- a/src/ekp_interface.jl +++ b/src/ekp_interface.jl @@ -5,6 +5,7 @@ using Distributions import EnsembleKalmanProcesses as EKP using EnsembleKalmanProcesses.ParameterDistributions using EnsembleKalmanProcesses.TOMLInterface +import ClimaComms """ path_to_iteration(output_dir, iteration) @@ -152,7 +153,7 @@ include(joinpath(experiment_path, "generate_truth.jl")) eki = CalibrateAtmos.calibrate(experiment_id) ``` """ -function calibrate(experiment_id) +function calibrate(experiment_id; device = ClimaComms.device()) ekp_config = YAML.load_file(joinpath("experiments", experiment_id, "ekp_config.yml")) # initialize the CalibrateAtmos @@ -165,11 +166,11 @@ function calibrate(experiment_id) for i in 0:(N_iter - 1) # run G model to produce output from N_mem ensemble members physical_model = get_forward_model(Val(Symbol(experiment_id))) - - for m in 1:N_mem # TODO: parallelize with pmap! + lk = ReentrantLock() + ClimaComms.@threaded device for m in 1:N_mem # model run for each ensemble member model_config = get_config(physical_model, m, i, experiment_id) - run_forward_model(physical_model, model_config) + run_forward_model(physical_model, model_config; lk) @info "Finished model run for member $m at iteration $i" end diff --git a/test/ekp.jl b/test/test_emulate_sample.jl similarity index 87% rename from test/ekp.jl rename to test/test_emulate_sample.jl index 9215064f..00eb3b1d 100644 --- a/test/ekp.jl +++ b/test/test_emulate_sample.jl @@ -10,13 +10,6 @@ using EnsembleKalmanProcesses.TOMLInterface import CalibrateAtmos as CAL -@testset "Test `calibrate`" begin - dir = pwd() - cd(pkgdir(CAL)) - @test_throws ErrorException CAL.calibrate("surface_fluxes_perfect_model") - cd(dir) -end - @testset "Emulate and Sample tests" begin y_obs = [261.5493] y_noise_cov = [0.02619;;] diff --git a/test/test_model_interface.jl b/test/test_model_interface.jl index 3a1b063d..5fd05516 100644 --- a/test/test_model_interface.jl +++ b/test/test_model_interface.jl @@ -29,3 +29,11 @@ end "observation_map not implemented for experiment Val{:test}() at iteration 1", ) CalibrateAtmos.observation_map(Val(:test), 1) end + +# This test depends on `surface_fluxes_perfect_model` in the `experiments/` folder +@testset "Test `calibrate`" begin + dir = pwd() + cd(pkgdir(CalibrateAtmos)) + @test_throws ErrorException CAL.calibrate("surface_fluxes_perfect_model") + cd(dir) +end