From 36389326383b6cd3277c8525ab33004cf17fc186 Mon Sep 17 00:00:00 2001 From: nefrathenrici Date: Mon, 22 Apr 2024 12:21:08 -0700 Subject: [PATCH] much better error handling --- src/slurm_interface.jl | 97 ++++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 42 deletions(-) diff --git a/src/slurm_interface.jl b/src/slurm_interface.jl index 2cc0f5e1..0552b408 100644 --- a/src/slurm_interface.jl +++ b/src/slurm_interface.jl @@ -20,13 +20,13 @@ function slurm_calibration(; initialize(config) eki = nothing - for i in 0:(n_iterations - 1) - @info "Iteration $i" - tasks = asyncmap(1:ensemble_size; ntasks = ensemble_size) do j + for iter in 0:(n_iterations - 1) + @info "Iteration $iter" + procs = asyncmap(1:ensemble_size; ntasks = ensemble_size) do j srun_model(; output_dir, - iter = i, member = j, + iter, time_limit, ntasks, partition, @@ -37,22 +37,58 @@ function slurm_calibration(; verbose, ) end + handle_ensemble_procs(procs, iter, output_dir) + @info "Completed iteration $iter, updating ensemble" + G_ensemble = observation_map(Val(Symbol(config.id)), iter) + save_G_ensemble(config, iter, G_ensemble) + eki = update_ensemble(config, iter) + end + return eki +end - ensemble_exit_codes = map(fetch, tasks) - if !any(ensemble_exit_codes) - error( - "Full ensemble for iteration $i has failed. See model logs in $(path_to_iteration(output_dir, i)) for details.", +function handle_ensemble_procs(procs, iteration, output_dir) + # Initial try handles InterruptException + try + asyncmap(enumerate(procs)) do (member, p) + member_log = joinpath( + path_to_ensemble_member(output_dir, iteration, member), + "model_log.txt", ) - elseif !all(ensemble_exit_codes) - @warn "Failed ensemble members: $(findall(!, ensemble_exit_codes))" + try + wait(p) + if p.exitcode != 0 + warn_on_member_error(member, member_log) + end + catch e + warn_on_member_error(member, member_log) + end end + catch e + e isa InterruptException && map(p -> kill(p), procs) + end + # Wait for processes to be killed + sleep(0.25) - @info "Completed iteration $i, updating ensemble" - G_ensemble = observation_map(Val(Symbol(config.id)), i) - save_G_ensemble(config, i, G_ensemble) - eki = update_ensemble(config, i) + exit_codes = map(x -> getproperty(x, :exitcode), procs) + if !any(x -> x == 0, exit_codes) + error("Full ensemble for iteration $iteration has failed. See model logs in $(path_to_iteration(output_dir, iteration)) for details.") + elseif !all(x -> x == 0, exit_codes) + @warn "Failed ensemble members: $(findall(!, exit_codes))" end - return eki +end + +function warn_on_member_error(member, member_log, verbose = false) + warn_str = if verbose + """ + Ensemble member $member raised an error. See model log at $member_log for stacktrace: + $(replace(readchomp(member_log), "\\n" => "\n")) + """ + else + """ + Ensemble member $member raised an error. See model log at $member_log for stacktrace. + """ + end + @warn warn_str end function srun_model(; @@ -89,32 +125,9 @@ function srun_model(; experiment_config = CAL.ExperimentConfig(experiment_dir) experiment_id = experiment_config.id physical_model = CAL.get_forward_model(Val(Symbol(experiment_id))) - model_config = CAL.get_config(physical_model, member, iteration, experiment_dir) - CAL.run_forward_model(physical_model, model_config) + CAL.run_forward_model(physical_model, CAL.get_config(physical_model, member, iteration, experiment_dir)) @info \"Forward Model Run Completed\" experiment_id physical_model iteration member" - ` - try - @info "Running ensemble member $member" - # verbose && - # member == 1 && - # println( - # "Ensemble member $member command: \n" * - # join(fwd_model_cmd.exec, " ") * - # "\n", - # ) - run(pipeline(fwd_model_cmd; stdout = member_log, stderr = member_log)) - return true - catch e - if verbose - @warn """ - Ensemble member $member raised an error. See model log for failed run: - $(replace(readchomp(member_log), "\\n" => "\n")) - """ - else - @warn """ - Ensemble member $member raised an error. See $member_log for output log. - """ - end - return false - end + ` + @info "Running ensemble member $member" + return open(pipeline(fwd_model_cmd; stdout = member_log, stderr = member_log)) end