diff --git a/src/slurm_workers.jl b/src/slurm_workers.jl index 131da06c..8c0c60a2 100644 --- a/src/slurm_workers.jl +++ b/src/slurm_workers.jl @@ -1,7 +1,6 @@ using Distributed import EnsembleKalmanProcesses as EKP export worker_calibrate, add_slurm_workers -default_worker_pool() = WorkerPool(workers()) function run_iteration(iter, ensemble_size, output_dir; worker_pool = default_worker_pool(), failure_rate = 0.5) # Create a channel to collect results @@ -75,35 +74,33 @@ function worker_calibrate(ekp::EKP.EnsembleKalmanProcess, ensemble_size,n_iterat return JLD2.load_object(path_to_iteration(output_dir, n_iterations)) end -# function slurm_worker_pool(nprocs::Int; exeflags = "--project=$(Base.active_project())", slurm_kwargs...) -# return WorkerPool(addprocs( -# SlurmManager(nprocs); -# t = "01:00:00", -# cpus_per_task = 1, -# # TODO: Fix output -# output = "worker_%j_%4t.out", -# exeflags = "--project=$(Base.active_project())", -# slurm_kwargs..., -# )) -# end - - worker_cookie() = begin Distributed.init_multi(); cluster_cookie() end worker_arg() = `--worker=$(worker_cookie())` -launched = WorkerConfig[] +struct SlurmManager <: ClusterManager + ntasks::Integer +end -function add_slurm_workers(ntasks; launched = launched, c = Condition(), exeflags = "--project=$(Base.active_project())", kwargs...) - Distributed.init_multi() +function Distributed.manage(manager::SlurmManager, id::Integer, config::WorkerConfig, + op::Symbol) +# This function needs to exist, but so far we don't do anything +end +function Distributed.launch(sm::SlurmManager,params::Dict, instances_arr::Array, + c::Condition) default_params = Distributed.default_addprocs_params() - kwargs = merge(default_params, Dict{Symbol, Any}(kwargs)) - dir = kwargs[:dir] - exename = kwargs[:exename] + params = merge(default_params, Dict{Symbol, Any}(params)) + exehome = params[:dir] + exename = params[:exename] + exeflags = params[:exeflags] + + stdkeys = keys(Distributed.default_addprocs_params()) + + slurm_params = filter(x->(!(x[1] in stdkeys) && x[1] != :job_file_loc), params) - slurm_kwargs = filter(x->!(x[1] in keys(default_params)), kwargs) srunargs = [] - for k in keys(slurm_kwargs) + + for k in keys(slurm_params) if length(string(k)) == 1 push!(srunargs, "-$k") val = p[k] @@ -120,13 +117,20 @@ function add_slurm_workers(ntasks; launched = launched, c = Condition(), exeflag end end end + # Get job file location from parameter dictionary. + job_file_loc = joinpath(exehome, get(params, :job_file_loc, ".")) + + # Make directory if not already made. + if !isdir(job_file_loc) + mkdir(job_file_loc) + end # Check for given output file name jobname = "julia-$(getpid())" + default_template = ".$jobname-$(trunc(Int, Base.time() * 10))" + default_output(x) = "$default_template-$x.out" # Set output name - default_template = "$jobname-$(trunc(Int, Base.time() * 10))" - default_output(x) = "$default_template-$x.txt" has_output_name = ("-o" in srunargs) | ("--output" in srunargs) job_output_file = if has_output_name # if has_output_name, ensure there is only one output arg @@ -137,11 +141,12 @@ function add_slurm_workers(ntasks; launched = launched, c = Condition(), exeflag filter!(x -> !occursin(r"^-[oe]", x), srunargs) job_output else - ".$(default_output("%4t"))" + # Slurm interpolates %4t to the task ID padded with up to four zeros + default_output("%4t") end push!(srunargs, "-o", job_output_file) - - srun_cmd = `srun -J $jobname -n $ntasks $(srunargs) $exename $(worker_arg())` + ntasks = sm.ntasks + srun_cmd = `srun -J $jobname -n $ntasks -D $exehome $(srunargs) $exename $exeflags $(worker_arg())` @info "Starting SLURM job $jobname: $srun_cmd" srun_proc = open(srun_cmd) @@ -153,14 +158,14 @@ function add_slurm_workers(ntasks; launched = launched, c = Condition(), exeflag # Wait for workers to start t_start = time() t_waited = round(Int, time() - t_start) - delays = ExponentialBackOff(10, 1.0, 512.0, 2.0, 0.1) + retry_delays = ExponentialBackOff(10, 1.0, 512.0, 2.0, 0.1) for i in 0:ntasks - 1 slurm_spec_match::Union{RegexMatch,Nothing} = nothing worker_errors = String[] if !has_output_name - job_output_file = ".$(default_output(lpad(i, 4, "0")))" + job_output_file = default_output("0000") end - for retry_delay in push!(collect(delays), 0) + for retry_delay in push!(collect(retry_delays), 0) t_waited = round(Int, time() - t_start) # Wait for output log to be created and populated, then parse @@ -210,8 +215,7 @@ function add_slurm_workers(ntasks; launched = launched, c = Condition(), exeflag # Keep a reference to the proc, so it's properly closed once # the last worker exits. config.userdata = srun_proc - push!(launched, config) + push!(instances_arr, config) notify(c) end - return WorkerPool(workers()) end