forked from SomajitDey/runman
-
Notifications
You must be signed in to change notification settings - Fork 0
/
runman
executable file
·327 lines (278 loc) · 11.8 KB
/
runman
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
#!/usr/bin/env bash
export TMPDIR="${TMPDIR:-/tmp}"
export TS_SOCKET="${TMPDIR}/runman.ts.socket"
export TS_SLOTS="$(nproc)"
export TS_MAXFINISHED="$((10*TS_SLOTS))" # Making this 0 won't show status of finished jobs
export TS_ENV='echo "User: ${USER}\nComment: ${comment}\nArrayID: ${RUNMAN_ARRAY_INDEX}@${jobid}\nWork Dir:${PWD}\nStdout: ${stdout}\nStderr: ${stderr}" '
export TS_ONFINISH="${BASH_SOURCE}"
_log(){
# Log the given message at stderr
# Usage: _log <message string>
echo -e "${1}"
}>&2
# Create system wide task-spooler socket, if non-existing
if ! [[ -S "${TS_SOCKET}" ]]; then
(tsp && chmod a+rw "${TS_SOCKET}")>/dev/null || { _log "Problem with task-spooler (command: tsp)"; exit 1;}
fi
_about(){
# Shows about info
_log "Run Man(ager)"
_log "About: A wrapper of task-spooler for scheduling jobs/runs in a multicore compute node."
_log "Repo: https://github.com/SomajitDey/runman"
_log "Usage: runman help"
}
_tg_check(){
# Checks if telegram.sh is configured for notification
if telegram -n 'dry-run' &>/dev/null; then
_log "Telegram is all set to notify you when job finishes, provided internet connectivity is there."
else
_log "Cannot notify via Telegram when job finishes. Setup Telegram feed with: runman tgfeed"
return 1
fi
}
_read_job_file(){
# Usage: _read_job_file <path.job>
jobfile="${1:-"${jobfile}"}"
# .job file contains the following key-value pairs
# Defaults are provided wherever applicable
cmd= # Command with arguments | Path to executable/script
ncpu=1 # Number of cores required to run the job
comment= # Comment/Remarks
stdout= # Path to stdout
stderr= # Path to stderr
wclock= # Max. wall clock time job can run
cwd="${PWD}" # Current working directory
after= # ID of a job the current job is dependent on
array= # Job array specs. Format:: start-end:stride
# Source given .job file
. "${jobfile}" || { _log "Problem reading ${jobfile}. Exiting."; exit 1;}
# Export those env vars that need to be accessed using TS_ENV (tsp -i <jobid>) later
export comment stdout stderr
}
_create_job(){
# Uses variables read/set by _read_job_file
# Operates in a sub-shell so as not to corrupt global environment
# Outputs job id (Only the leader's job id for job arrays)
(
cd "${cwd}"
export OMP_NUM_THREADS="${ncpu}"
local dep_opt="${after:+"-D ${after}"}"
local err_opt="${stderr:+"-E"}"
local timeout="${wclock:+"timeout --kill-after=0.5s ${wclock}"}"
IFS='-:' read array_start array_end array_stride <<< "${array}"
local array_index
local jobid="$(RUNMAN_ARRAY_INDEX="${array_start}" tsp -N "${ncpu}" -L "${ncpu}@${USER}" ${dep_opt} ${err_opt} ${timeout} ${cmd})" \
|| { _log "Failed to submit ${jobfile}. Exiting."; exit 1;}
export jobid
for array_index in $(seq ${array_start} ${array_stride} ${array_end} 2>/dev/null | tail -n+2 ); do
RUNMAN_ARRAY_INDEX="${array_index}" tsp -N "${ncpu}" -L "${ncpu}@${USER}@${jobid}(${array_index})" ${dep_opt} ${err_opt} ${timeout} ${cmd} >/dev/null \
|| { _log "Failed to submit array_job_index=${array_index}. Exiting."; exit 1;}
done
echo "${jobid}"
)
}
_ts_onfinish(){
# Effectively called by task-spooler after job finishes
# Usage: _ts_onfinish <jobid> <errorlevel> <output_filename> <command>
local jobid="${1}" errlvl="${2}" tmp_stdout="${3}" cmd="${4}"
if [[ "${cmd}" =~ ^timeout\ ]] && (( errlvl == 124 )); then
errlvl="${errlvl} (timed out)"
fi
local status="$(echo "Job ID: ${jobid}"; echo "Exit Code: ${errlvl}"; tsp -i "${jobid}")"
local stdout="$(grep ^Stdout: <<< "${status}" | cut -d ' ' -f 2)"
local stderr="$(grep ^Stderr: <<< "${status}" | cut -d ' ' -f 2)"
local array_index="$(grep ^ArrayID: <<< "${status}" | cut -d ' ' -f 2 | cut -d '@' -f 1)"
[[ -n "${stdout}" ]] && mv -f "${tmp_stdout}" "${stdout}${array_index}" || rm -f "${tmp_stdout}"
[[ -n "${stderr}" ]] && mv -f "${tmp_stdout}.e" "${stderr}${array_index}" || rm -f "${tmp_stdout}.e"
telegram -T "RunMan@$(hostname)" -M "${status}"
# TODO: Kill children, if still running, once the process leader exits
}
_kill_proc_tree(){
# Kills given process and all its descendants (with given signal)
# If there's a - infront of the given process id, doesn't kill its descendants
# Usage: _kill_proc_tree [-]<ppid> [<SIGspec>]
local pid ppid="${1#-}" sig="${2:-KILL}"
# Note that following loop is not entered if $1 is negative
for pid in $(pgrep -P "${1}"); do
_kill_proc_tree "${pid}" "${sig}" &
done
kill "-${sig}" "${ppid}"
}
_runman_sub(){
# Reads given job files and submits/enqueues them
# Usage: _runman_sub <path.job>...
_tg_check
local jobfile
for jobfile in "${@}"; do
_read_job_file
echo "Job ID for ${jobfile}: $(_create_job)"
done
}
_runman_stat(){
# Shows status of given job
# Usage: _runman_stat <jobid>
# <jobid> may be of the form <jobid> or <jobid>(<array_index>)
local jobid="${1}"
if [[ "${jobid}" =~ [[:digit:]]+\([[:digit:]]+\) ]]; then
# Escaping ( and ) in $jobid expansion in `grep` argument below
jobid="${jobid//\(/\\(}"; jobid="${jobid//\)/\\)}"
jobid="$(tsp | grep -E "[[[:digit:]]+@${USER}@${jobid}]" | awk '{print $1}')"
fi
_log "Actual jobid=${jobid}"
[[ "${jobid}" =~ [[:digit:]]+ ]] && tsp -i "${jobid}" 2>/dev/null || _log "Job has either finished or is non-existent"
}
_runman_array(){
# List all job IDs for a job array represented by the first element (leader)'s job ID.
# Useful for running _runman_abort , _runman_del etc. on all the elemental jobs of an array using a `for` loop or `xargs`.
# Usage: _runman_array <leader_jobID>
local jobid="${1}"
echo "${jobid}" # Job ID for leader
tsp | grep -E "[[[:digit:]]+@${USER}@${jobid}\([[:digit:]]+\)]" | awk '{print $1}' # Job ID for the remaining elements
}
_runman_abort(){
# Aborts given job, i.e. its entire process tree (with given signal)
# If there's a - infront of jobid, kills process leader only
# Usage: _runman_abort [-]<jobid> [<SIGspec>]
local jobid="${1#-}"
local ppid="$(tsp -p "${jobid}" 2>/dev/null)" sig="${2}"
# ppid doesn't exist when job is queued (i.e. yet to run)
if [[ -n "${ppid}" ]]; then
_kill_proc_tree "${1%${jobid}}${ppid}" "${sig}"
else
_runman_del "${jobid}"
fi
}
_runman_del(){
# Deletes given job
# Usage: _runman_del <jobid>
tsp -r "${1}" || _log "Try aborting."
}
_runman_list(){
# Lists all unfinished jobs by the USER
_log "ID\tState\tComment"
local jobid buffer1="$(mktemp -u)" buffer2="$(mktemp -u)"
trap "rm -f ${buffer1} ${buffer2}" return
tsp | grep -E "[[[:digit:]]+@${USER}]" | awk '{print $1, $2}' | grep -vw 'finished' > "${buffer1}"
for jobid in $(awk '{print $1}' "${buffer1}"); do
tsp -i "${jobid}" | grep ^Comment: | cut -d ' ' -f 2-
done > "${buffer2}"
paste "${buffer1}" "${buffer2}"
}
_runman_swap(){
# Swaps queue positions of the scheduled jobs
# If the second jobid is not provided, make the first job run as soon as possible
# Usage: _runman_swap <jobid> [<jobid>]
local jobid="${1}" swap_with="${2}"
if [[ -z "${swap_with}" ]]; then
swap_with="$(_runman_list 2>/dev/null | head -n1 | awk '{print $1}')"
fi
tsp -U "${jobid}"-"${swap_with}"
}
_runman_tgfeed(){
# Sets up Telegram feed for you
local tg_local_conf="${HOME}/.telegram.sh.conf"
rm -f "${tg_local_conf}" # Delete earlier config for resetting purpose
export TELEGRAM_TOKEN TELEGRAM_CHAT
# Intro
cat <<- EOF
This will help you setup your Telegram feed in no time. Just follow the steps below (skipping unnecessary steps).
Note: Proceed only after making sure we are connected to the internet.
EOF
# Install telegram, if not installed
which telegram &>/dev/null || cat <<- EOF
Step 0: Install telegram from https://github.com/fabianonline/telegram.sh
- Download with: curl -LO https://raw.githubusercontent.com/fabianonline/telegram.sh/master/telegram
- Set permission with: chmod +rx telegram
- Install at PATH: sudo install telegram /usr/local/bin
- Or install locally and put the installation directory in PATH in ~/.bashrc
EOF
# Creating bot and token, if non-existent. Not needed if admin provides token in /etc/telegram.sh.conf.
if ! telegram -m &>/dev/null; then
cat <<-EOF
Step 1: On your smartphone's Telegram app, search for @botfather and start a chat.
Step 2: Use the /newbot command to create a new bot. BotFather will give you a token. Provide it below.
EOF
read -p "Token: " TELEGRAM_TOKEN
while ! telegram -m &>/dev/null; do
_log "Wrong Token provided. Please re-enter."
read -p "Token: " TELEGRAM_TOKEN
done
echo "TELEGRAM_TOKEN='${TELEGRAM_TOKEN}'" > "${tg_local_conf}"
fi
# Getting chat ID
cat <<- EOF
Step 3: Start a chat with your bot. Send it your user ID, i.e.: ${USER}.
EOF
read -p "Press any key after completing step 3:"$'\n' -s -n1
local buffer="$(mktemp -u)"
trap "rm -f ${buffer}" return exit
while !(telegram -m | grep -iw "${USER}")>"${buffer}"; do
read -p "Press any key after completing step 3 AGAIN:"$'\n' -s -n1
done
TELEGRAM_CHAT="$(awk '{print $2}' "${buffer}")"
_tg_check || { _log "Something went wrong. Try again."; exit 1;}
echo "TELEGRAM_CHAT='${TELEGRAM_CHAT}'" > "${tg_local_conf}"
} >&2
_runman_help(){
# Shows usage info
cat <<- EOF
Syntax: runman <subcmd> <args>
List of subcommands with their arguments:
sub <path to .job file> ...
Reads provided .job file(s) and enqueues the corresponding jobs. Provides job ID at stdout.
For job arrays, only the job ID of the first elemental job (leader) is shown, which may represent the entire array.
stat <job ID> | <array_leader_ID>(<array_index>)
Shows status of given job (standalone or elemental job of a job array). Example: stat 176(4)
abort [-]<jobid> [<SIGspec>]
Aborts given job, i.e. its entire process tree, with SIGKILL.
If signal specification is provided (kill -l), use it instead.
If there's a - infront of jobid, kills process leader only.
del <jobid>
Deletes given job
list
Lists all unfinished jobs by the USER
array <leader_jobID>
Lists the job IDs of all elemental jobs for the given job array.
One may use this list with xargs for del, abort etc.
swap <jobid> [<jobid>]
Swaps queue positions of the scheduled jobs.
If the second jobid is not provided, make the first job run ASAP.
tgfeed
Guides in setting up Telegram feed.
tsp <argument> ...
Executes tsp in runman's environment. Mainly for admin use and
going beyond runman's capabilities. Tasks are labelled as [#cpu@USER].
help
Shows this help.
.job file syntax:
# This is a comment line
# .job files contain some or all of the following key-value pairs to list USER's requirements
# Default values are provided for reference
# Values with spaces etc. must be provided within quotes for safety
cmd= # (Mandatory) Command or Path to executable/script, with or without arguments
ncpu=1 # Number of cores required to run the job
comment= # Comment/Remarks
stdout= # Path to stdout
stderr= # Path to stderr
wclock= # Max. wall clock time job can run
cwd="\${PWD}" # Current working directory. Default is the directory from where runman is called
after= # ID of a job the current job is dependent on
array= # Specify job arrays with array index in the range... m-n:s, where m is the starting integer, n the end one, s the stride.
# For job arrays, each elemental job gets its corresponding array index through the environment variable RUNMAN_ARRAY_INDEX.
EOF
}
_main(){
# The main driver
if [[ -z "${@}" ]]; then
_about
elif [[ "${1}" =~ ^[[:digit:]]+$ ]]; then
_ts_onfinish "${@}"
elif [[ "${1}" == 'tsp' ]]; then
"${@}"
elif (declare -F | grep -w "_runman_${1}")&>/dev/null; then
"_runman_${@}"
else
_log "Bad usage. See: runman help"
fi
}
_main "${@}"