Skip to content

Commit

Permalink
more fixes, debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
nefrathenrici committed Dec 9, 2024
1 parent 7844f9e commit 4836415
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions src/slurm_workers.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using Distributed
import EnsembleKalmanProcesses as EKP
export worker_calibrate, add_slurm_workers
default_worker_pool() = WorkerPool(workers())

function run_iteration(iter, ensemble_size, output_dir; worker_pool = default_worker_pool(), failure_rate = 0.5)
# Create a channel to collect results
Expand Down Expand Up @@ -75,35 +74,33 @@ function worker_calibrate(ekp::EKP.EnsembleKalmanProcess, ensemble_size,n_iterat
return JLD2.load_object(path_to_iteration(output_dir, n_iterations))
end

# function slurm_worker_pool(nprocs::Int; exeflags = "--project=$(Base.active_project())", slurm_kwargs...)
# return WorkerPool(addprocs(
# SlurmManager(nprocs);
# t = "01:00:00",
# cpus_per_task = 1,
# # TODO: Fix output
# output = "worker_%j_%4t.out",
# exeflags = "--project=$(Base.active_project())",
# slurm_kwargs...,
# ))
# end


worker_cookie() = begin Distributed.init_multi(); cluster_cookie() end
worker_arg() = `--worker=$(worker_cookie())`

launched = WorkerConfig[]
struct SlurmManager <: ClusterManager
ntasks::Integer
end

function add_slurm_workers(ntasks; launched = launched, c = Condition(), exeflags = "--project=$(Base.active_project())", kwargs...)
Distributed.init_multi()
function Distributed.manage(manager::SlurmManager, id::Integer, config::WorkerConfig,
op::Symbol)
# This function needs to exist, but so far we don't do anything
end

function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array,
c::Condition)
default_params = Distributed.default_addprocs_params()
kwargs = merge(default_params, Dict{Symbol, Any}(kwargs))
dir = kwargs[:dir]
exename = kwargs[:exename]
params = merge(default_params, Dict{Symbol, Any}(params))
exehome = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]

stdkeys = keys(Distributed.default_addprocs_params())

slurm_params = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params)

slurm_kwargs = filter(x->!(x[1] in keys(default_params)), kwargs)
srunargs = []
for k in keys(slurm_kwargs)

for k in keys(slurm_params)
if length(string(k)) == 1
push!(srunargs, "-$k")
val = p[k]
Expand All @@ -120,13 +117,20 @@ function add_slurm_workers(ntasks; launched = launched, c = Condition(), exeflag
end
end
end
# Get job file location from parameter dictionary.
job_file_loc = joinpath(exehome, get(params, :job_file_loc, "."))

# Make directory if not already made.
if !isdir(job_file_loc)
mkdir(job_file_loc)
end

# Check for given output file name
jobname = "julia-$(getpid())"
default_template = ".$jobname-$(trunc(Int, Base.time() * 10))"
default_output(x) = "$default_template-$x.out"

# Set output name
default_template = "$jobname-$(trunc(Int, Base.time() * 10))"
default_output(x) = "$default_template-$x.txt"
has_output_name = ("-o" in srunargs) | ("--output" in srunargs)
job_output_file = if has_output_name
# if has_output_name, ensure there is only one output arg
Expand All @@ -137,11 +141,12 @@ function add_slurm_workers(ntasks; launched = launched, c = Condition(), exeflag
filter!(x -> !occursin(r"^-[oe]", x), srunargs)
job_output
else
".$(default_output("%4t"))"
# Slurm interpolates %4t to the task ID padded with up to four zeros
default_output("%4t")
end
push!(srunargs, "-o", job_output_file)

srun_cmd = `srun -J $jobname -n $ntasks $(srunargs) $exename $(worker_arg())`
ntasks = sm.ntasks
srun_cmd = `srun -J $jobname -n $ntasks -D $exehome $(srunargs) $exename $exeflags $(worker_arg())`

@info "Starting SLURM job $jobname: $srun_cmd"
srun_proc = open(srun_cmd)
Expand All @@ -153,14 +158,14 @@ function add_slurm_workers(ntasks; launched = launched, c = Condition(), exeflag
# Wait for workers to start
t_start = time()
t_waited = round(Int, time() - t_start)
delays = ExponentialBackOff(10, 1.0, 512.0, 2.0, 0.1)
retry_delays = ExponentialBackOff(10, 1.0, 512.0, 2.0, 0.1)
for i in 0:ntasks - 1
slurm_spec_match::Union{RegexMatch,Nothing} = nothing
worker_errors = String[]
if !has_output_name
job_output_file = ".$(default_output(lpad(i, 4, "0")))"
job_output_file = default_output("0000")
end
for retry_delay in push!(collect(delays), 0)
for retry_delay in push!(collect(retry_delays), 0)
t_waited = round(Int, time() - t_start)

# Wait for output log to be created and populated, then parse
Expand Down Expand Up @@ -210,8 +215,7 @@ function add_slurm_workers(ntasks; launched = launched, c = Condition(), exeflag
# Keep a reference to the proc, so it's properly closed once
# the last worker exits.
config.userdata = srun_proc
push!(launched, config)
push!(instances_arr, config)
notify(c)
end
return WorkerPool(workers())
end

0 comments on commit 4836415

Please sign in to comment.