Skip to content

Commit

Permalink
much better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
nefrathenrici committed Apr 22, 2024
1 parent 66fd2d9 commit 3638932
Showing 1 changed file with 55 additions and 42 deletions.
97 changes: 55 additions & 42 deletions src/slurm_interface.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ function slurm_calibration(;
initialize(config)

eki = nothing
for i in 0:(n_iterations - 1)
@info "Iteration $i"
tasks = asyncmap(1:ensemble_size; ntasks = ensemble_size) do j
for iter in 0:(n_iterations - 1)
@info "Iteration $iter"
procs = asyncmap(1:ensemble_size; ntasks = ensemble_size) do j
srun_model(;
output_dir,
iter = i,
member = j,
iter,
time_limit,
ntasks,
partition,
Expand All @@ -37,22 +37,58 @@ function slurm_calibration(;
verbose,
)
end
handle_ensemble_procs(procs, iter, output_dir)
@info "Completed iteration $iter, updating ensemble"
G_ensemble = observation_map(Val(Symbol(config.id)), iter)
save_G_ensemble(config, iter, G_ensemble)
eki = update_ensemble(config, iter)
end
return eki
end

ensemble_exit_codes = map(fetch, tasks)
if !any(ensemble_exit_codes)
error(
"Full ensemble for iteration $i has failed. See model logs in $(path_to_iteration(output_dir, i)) for details.",
function handle_ensemble_procs(procs, iteration, output_dir)
# Initial try handles InterruptException
try
asyncmap(enumerate(procs)) do (member, p)
member_log = joinpath(
path_to_ensemble_member(output_dir, iteration, member),
"model_log.txt",
)
elseif !all(ensemble_exit_codes)
@warn "Failed ensemble members: $(findall(!, ensemble_exit_codes))"
try
wait(p)
if p.exitcode != 0
warn_on_member_error(member, member_log)
end
catch e
warn_on_member_error(member, member_log)
end
end
catch e
e isa InterruptException && map(p -> kill(p), procs)
end
# Wait for processes to be killed
sleep(0.25)

@info "Completed iteration $i, updating ensemble"
G_ensemble = observation_map(Val(Symbol(config.id)), i)
save_G_ensemble(config, i, G_ensemble)
eki = update_ensemble(config, i)
exit_codes = map(x -> getproperty(x, :exitcode), procs)
if !any(x -> x == 0, exit_codes)
error("Full ensemble for iteration $iteration has failed. See model logs in $(path_to_iteration(output_dir, iteration)) for details.")
elseif !all(x -> x == 0, exit_codes)
@warn "Failed ensemble members: $(findall(!, exit_codes))"
end
return eki
end

function warn_on_member_error(member, member_log, verbose = false)
warn_str = if verbose
"""
Ensemble member $member raised an error. See model log at $member_log for stacktrace:
$(replace(readchomp(member_log), "\\n" => "\n"))
"""
else
"""
Ensemble member $member raised an error. See model log at $member_log for stacktrace.
"""
end
@warn warn_str
end

function srun_model(;
Expand Down Expand Up @@ -89,32 +125,9 @@ function srun_model(;
experiment_config = CAL.ExperimentConfig(experiment_dir)
experiment_id = experiment_config.id
physical_model = CAL.get_forward_model(Val(Symbol(experiment_id)))
model_config = CAL.get_config(physical_model, member, iteration, experiment_dir)
CAL.run_forward_model(physical_model, model_config)
CAL.run_forward_model(physical_model, CAL.get_config(physical_model, member, iteration, experiment_dir))
@info \"Forward Model Run Completed\" experiment_id physical_model iteration member"
`
try
@info "Running ensemble member $member"
# verbose &&
# member == 1 &&
# println(
# "Ensemble member $member command: \n" *
# join(fwd_model_cmd.exec, " ") *
# "\n",
# )
run(pipeline(fwd_model_cmd; stdout = member_log, stderr = member_log))
return true
catch e
if verbose
@warn """
Ensemble member $member raised an error. See model log for failed run:
$(replace(readchomp(member_log), "\\n" => "\n"))
"""
else
@warn """
Ensemble member $member raised an error. See $member_log for output log.
"""
end
return false
end
`
@info "Running ensemble member $member"
return open(pipeline(fwd_model_cmd; stdout = member_log, stderr = member_log))
end

0 comments on commit 3638932

Please sign in to comment.