Skip to content

Commit

Permalink
Add SlurmJobArray
Browse files Browse the repository at this point in the history
  • Loading branch information
gmatteo committed Nov 9, 2023
1 parent d7591d4 commit 5c5687a
Showing 1 changed file with 110 additions and 0 deletions.
110 changes: 110 additions & 0 deletions abipy/flowtk/qutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from monty.string import is_string
from pymatgen.core.units import Time, Memory
from abipy.tools.typing import PathLike
from abipy.tools import duck


Expand Down Expand Up @@ -164,3 +165,112 @@ def slurm_get_jobs(username=None) -> dict[int, dict]:
entries.append(d)

return {e["JOBID"]: e for e in entries}



class SlurmJobArray:
"""
Example:
header = '''\
#!/bin/bash
#SBATCH --account=battab
#SBATCH --job-name=abiml_md
#SBATCH --time=0-16:0:0
#SBATCH --partition=batch
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1 # 1 node has 128 cores
#SBATCH --cpus-per-task=1
conda activate env3.10
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
ulimit -s unlimited
'''
command = "abiml.py md"
arr_options = ["--help", "--version"]
job_array = SlurmJobArray(header, command, arr_options)
print(job_array)
queue_id = job_array.sbatch("job.sh")
"""

def __init__(self, header, command, arr_options):
self.command = str(command)
self.header = str(header)
self.arr_options = arr_options
self.arr_options_str = "\n".join(arr_options)

def __str__(self):
# Add slurm array section.
lines = self.header.splitlines()
for il, line in enumerate(lines):
if line.startswith("#SBATCH"):
break
else:
raise ValueError("Cannot find line starting with #SBATCH")
lines.insert(il, f"#SBATCH --array=0:{len(self.arr_options)-1}")
header = "\n".join(lines)

select_opts = r"""
# multiline string
OPTS_STRING="
%s
"
# Convert the multiline string to an array
IFS=$'\n' read -rd '' -a OPTS_LIST <<< "$OPTS_STRING"
# Index of the entry you want (0-based)
index=0
#index=${SLURM_ARRAY_TASK_ID}
# Check if the index is within the range of the array
if (( index >= 0 && index < ${#OPTS_LIST[@]} )); then
OPTS="${OPTS_LIST[index]}"
echo "Selected entry at index $index: $OPTS"
else
echo "Index $index is out of range"
exit 1
fi
""" % (self.arr_options_str)

end = f"{self.command} ${{OPTS}}"
return header + select_opts + end

def sbatch(self, slurm_filepath: PathLike, dry_run=False) -> int:
with open(slurm_filepath, "wt") as fh:
fh.write(str(self))

queue_id = slurm_submit(slurm_filepath)
with open(slurm_filepath + ".qid", "wt") as fh:
fh.write("# Slurm job id")
fh.write(str(queue_id))
return queue_id



def slurm_submit(script_file) -> int:
"""
Submit a job script to the queue with sbatch
"""
from subprocess import Popen, PIPE
# need string not bytes so must use universal_newlines
process = Popen(['sbatch', script_file], stdout=PIPE, stderr=PIPE, universal_newlines=True)
out, err = process.communicate()

# grab the returncode. SLURM returns 0 if the job was successful
if process.returncode == 0:
try:
# output should of the form '2561553.sdb' or '352353.jessup' - just grab the first part for job id
queue_id = int(out.split()[3])
print('Job submission was successful and queue_id is {}'.format(queue_id))
return queue_id
except Exception as exc:
# probably error parsing job code
print('Could not parse job id following slurm...')
raise exc
else:
raise RuntimeError(f"Error while submitting {script_file=}")

0 comments on commit 5c5687a

Please sign in to comment.