Skip to content

Commit

Permalink
Jobs (#64)
Browse files Browse the repository at this point in the history
* job submission system

* debug

* debug

* debug

* README

* docstr
  • Loading branch information
FaroutYLq authored Dec 10, 2023
1 parent c20c6c7 commit 14f27fd
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 0 deletions.
12 changes: 12 additions & 0 deletions jobs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Jobs
## Scope
Job submission scripts.
## Structure
`config.ini`: Configuraiton that you want to change everytime before submitting.
`submit.py`: wrapper around `utilix.batchq` to submit jobs.
`job.py`: processing script, just a wrapper around `st.make`.
## Usage
Update `config.ini` and run this in a container
```
python submit.py
```
23 changes: 23 additions & 0 deletions jobs/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[utilix]
max_num_submit = 200
t_sleep = 1

[job]
container = xenonnt-development.simg
runids = 021952,021967,022175,023747
output_folder = /scratch/midway2/yuanlq/salt/sr0_rn220
saltax_mode = salt
faxconf_version = sr0_v4
generator_name = flat
recoil = 7
mode = all

[slurm]
username = yuanlq
account = pi-lgrandi
job_title = salt
partition = caslake
qos = caslake
mem_per_cpu = 30000
cpus_per_task = 1
log_dir = /home/yuanlq/.tmp_job_submission/saltax
92 changes: 92 additions & 0 deletions jobs/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import saltax
import straxen
import configparser
from datetime import datetime
import sys
straxen.print_versions()

now = datetime.now()
_, runid = sys.argv
runid = int(runid)
strrunid = str(runid).zfill(6)

config = configparser.ConfigParser()
config.read('config.ini')
output_folder = config.get('job', 'output_folder')
saltax_mode = config.get('job', 'saltax_mode')
faxconf_version = config.get('job', 'faxconf_version')
generator_name = config.get('job', 'generator_name')
recoil = config.getint('job', 'recoil')
mode = config.get('job', 'mode')

print("Used time:", datetime.now() - now)
now = datetime.now()

print("Finished importing and config loading, now start to load context.")
print("Now starting %s context for run %d"%(saltax_mode, runid))
st = saltax.contexts.sxenonnt(runid = runid,
saltax_mode = saltax_mode,
output_folder = output_folder,
faxconf_version = faxconf_version,
generator_name = generator_name,
recoil = recoil,
mode = mode)

st.make(strrunid, 'raw_records_simu')
st.make(strrunid, 'records')
st.make(strrunid, 'peaklets')
st.make(strrunid, 'merged_s2s')
st.make(strrunid, 'event_basics')
st.make(strrunid, 'event_info')

print("Used time:", datetime.now() - now)
now = datetime.now()

print("Finished making all the computation for run %d in \
saltax mode %s. "%(runid, saltax_mode))

if saltax_mode == 'salt':
print("Since you specified saltax_mode = salt, \
we will also compute simulation-only and data-only.")

st = saltax.contexts.sxenonnt(runid = runid,
saltax_mode = 'data',
output_folder = output_folder,
faxconf_version = faxconf_version,
generator_name = generator_name,
recoil = recoil,
mode = mode)
st.make(strrunid, 'records', save=True)
st.make(strrunid, 'peaklets')
st.make(strrunid, 'merged_s2s')
st.make(strrunid, 'event_basics')
st.make(strrunid, 'event_info')
st.make(strrunid, 'cuts_basic')

print("Used time:", datetime.now() - now)
now = datetime.now()

print("Finished making all the computation for run %d in \
saltax mode %s. "%(runid, 'data'))

st = saltax.contexts.sxenonnt(runid = runid,
saltax_mode = 'simu',
output_folder = output_folder,
faxconf_version = faxconf_version,
generator_name = generator_name,
recoil = recoil,
mode = mode)
st.make(strrunid, 'raw_records_simu')
st.make(strrunid, 'records')
st.make(strrunid, 'peaklets')
st.make(strrunid, 'merged_s2s')
st.make(strrunid, 'event_basics')
st.make(strrunid, 'event_info')

print("Used time:", datetime.now() - now)
now = datetime.now()

print("Finished making all the computation for run %d in \
saltax mode %s. "%(runid, 'simu'))

print("Finished all. Exiting.")
88 changes: 88 additions & 0 deletions jobs/submit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import numpy as np
import time
import os
import utilix
from utilix.batchq import *
import configparser


config = configparser.ConfigParser()
config.read('config.ini')

MAX_NUM_SUBMIT = config.getint('utilix', 'max_num_submit')
T_SLEEP = config.getfloat('utilix', 't_sleep')
USER = config.get('slurm', 'username')
ACCOUNT = config.get('slurm', 'account')
LOG_DIR = config.get('slurm', 'log_dir')
CONTAINER = config.get('job', 'container')
RUNIDS = config.get('job', 'runids')
JOB_TITLE = config.get('slurm', 'job_title')
PARTITION = config.get('slurm', 'partition')
QOS = config.get('slurm', 'qos')
MEM_PER_CPU = config.getint('slurm', 'mem_per_cpu')
CPUS_PER_TASK = config.getint('slurm', 'cpus_per_task')

RUNIDS = [int(runid) for runid in RUNIDS.split(',')]
os.makedirs(LOG_DIR, exist_ok=True)


class Submit(object):
def name(self):
return self.__class__.__name__

def execute(self, *args, **kwargs):
eval('self.{name}(*args, **kwargs)'.format(name = self.name().lower()))

def submit(self, loop_over=[], max_num_submit=10, nmax=10000):
"""
Submit jobs to slurm.
"""
_start = 0
self.max_num_submit = max_num_submit
self.loop_over = loop_over
self.p = True

index = _start
while (index < len(self.loop_over) and index < nmax):
if (self.working_job() < self.max_num_submit):
self._submit_single(loop_index=index,
loop_item=self.loop_over[index])

time.sleep(T_SLEEP)
index += 1

# check my jobs
def working_job(self):
"""
Check how many jobs are running.
"""
cmd='squeue --user={user} | wc -l'.format(user = USER)
jobNum=int(os.popen(cmd).read())
return jobNum -1

def _submit_single(self, loop_index, loop_item):
"""
Submit a single job.
"""
jobname = JOB_TITLE + '_{:03}'.format(loop_index)
# Modify here for the script to run
jobstring = "python job.py %s"%(loop_item)
print(jobstring)

# Modify here for the log name
utilix.batchq.submit_job(
jobstring, log='%s/%s.log'%(LOG_DIR, jobname),
partition=PARTITION, qos=QOS,
account=ACCOUNT, jobname=jobname,
dry_run=False, mem_per_cpu=MEM_PER_CPU,
container=CONTAINER,
cpus_per_task=CPUS_PER_TASK)

p = Submit()

# Modify here for the runs to process
loop_over = RUNIDS
print("Going to process these runs:", loop_over)
print('Number of runs to process: ', len(loop_over))
print('Your log files are in: ', LOG_DIR)
p.execute(loop_over=loop_over, max_num_submit=MAX_NUM_SUBMIT)

0 comments on commit 14f27fd

Please sign in to comment.