-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathuqstat
executable file
·189 lines (149 loc) · 6.2 KB
/
uqstat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/g/data3/hh5/public/apps/nci_scripts/python-analysis3
import pandas
import subprocess
import json
import argparse
import re
import sys
import pymunge
import requests
from qtools import *
# Base charge rates in SU/hour for 1 cpu with minimal memory
charge_rates = {
'express': 6,
'normal': 2,
'copyq': 2,
'hugemem': 3,
'gpuvolta': 3,
'hugemembw': 1.25,
'megamembw': 1.25,
'expresswb': 3.75,
'normalbw': 1.25,
'normalsl': 1.5,
}
# Expected memory, in GB per CPU
base_mem = {
'express': 4,
'normal': 4,
'copyq': 4,
'hugemem': 32,
'gpuvolta': 8,
'hugemembw': 1024/28,
'megamembw': 3072/32,
'expressbw': 256/28,
'normalbw': 256/28,
'normalsl': 192/32,
}
def maybe_get(x, key):
if pandas.isnull(x):
return x
else:
return x[key]
def get_list(s, lst, key):
if f'{lst}.{key}' in s:
return s.get(f'{lst}.{key}', None)
try:
return s.get(lst, {}).get(key, None)
except AttributeError:
return None
def qstat_df(historical=False):
command= ['/opt/pbs/default/bin/qstat','-f','-F','json']
if historical:
command.append('-x')
r = subprocess.run(command, stdout=subprocess.PIPE, universal_newlines=True)
r.check_returncode()
j = clean_qstat_json(r.stdout)
if 'Jobs' not in j:
return None
df = pandas.DataFrame.from_dict(j['Jobs'], orient='index')
df['stime'] = df.get('stime', pandas.Series(index=df.index, dtype='datetime64[ns]'))
df['stime'] = pandas.to_datetime(df['stime'])
df['qtime'] = pandas.to_datetime(df['qtime'])
return df
def nqstat_df(project):
url = 'http://gadi-pbs-01.gadi.nci.org.au:8812/qstat'
token = pymunge.encode().decode('utf-8')
headers = { 'Authorization': "MUNGE %s" % (token) }
params = {'project': project}
response = requests.get(url, params=params, headers=headers, timeout=120.0)
response.raise_for_status()
j = response.json()
df = pandas.DataFrame(j['qstat'])
if len(df) == 0:
return None
df = df.set_index('Job_ID')
df.index.name = None
df['stime'] = df.get('stime', pandas.Series(index=df.index, dtype='datetime64[ns]'))
df['stime'] = pandas.to_datetime(df['stime'], unit='s')
df['qtime'] = pandas.to_datetime(df['qtime'], unit='s')
return df
description = """
Print more detailed information from qstat
Returns the following columns for each job:
project: NCI project the job was submitted under
job_name: Name of the job
queue: Queue the job was submitted to
state: Current state - 'Q' in queue, 'R' running, 'H' held, 'E' finished
ncpus: Number of cpus requested
walltime: Walltime the job has run for so far
su: SU cost of the job so far
mem_pct: Percent of the memory request used
cpu_pct: Percent of time CPUs have been active
qtime: Time the job spent in the queue before starting
If 'mem_pct' is below 80% make sure you're not requesting too much memory (4GB
per CPU or less is fine)
If 'cpu_pct' is below 80% and you're requesting more than one CPU make sure
your job is making proper use of parallelisation
"""
def main():
parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--historical','-x', help='Show historical info', action='store_true')
parser.add_argument('--format','-f', help='Output format', choices=['table','csv','json'], default='table')
parser.add_argument('--project','-P', help='Show all jobs in a project')
parser.add_argument('--comment','-c', help='Show PBS queue comment', action='store_true')
args = parser.parse_args()
if args.project is None:
df = qstat_df(args.historical)
else:
df = nqstat_df(args.project)
if df is None:
return
df['queue'] = df['queue'].apply(lambda x: x[:-5] if x.endswith('-exec') else x)
df['ncpus'] = df.apply(get_list, lst='Resource_List', key='ncpus', axis=1).astype('i')
df['mem_request'] = df.apply(get_list, lst='Resource_List', key='mem', axis=1).apply(decode_bytes)
df['cputime'] = pandas.to_timedelta(df.apply(get_list, lst='resources_used', key='cput', axis=1))
df['walltime'] = pandas.to_timedelta(df.apply(get_list, lst='resources_used', key='walltime', axis=1))
df['mem_used'] = df.apply(get_list, lst='resources_used', key='mem', axis=1).apply(decode_bytes)
df['mem_pct'] = df['mem_used'] / df['mem_request'] * 100
df['cpu_pct'] = df['cputime'] / df['walltime'] / df['ncpus'] * 100
df['state'] = df['job_state']
df['stime'] = df['stime'].fillna(pandas.Timestamp.now())
df['qtime'] = (df['stime'] - df['qtime']).apply(lambda x: x.round('min'))
df['wall_hours'] = pandas.to_timedelta(df['walltime']).apply(lambda x: x.total_seconds()) / (60*60)
# The number of cpus this would use based on the memory request
df['ncpus_by_mem'] = df['mem_request'] / (df['queue'].apply(lambda x: base_mem.get(x, 4)) * 1024**3)
# Multiply the larger of ncpus, ncpus_by_mem by the walltime
df['charge_rate'] = df['ncpus'].where(df['ncpus'] > df['ncpus_by_mem'], df['ncpus_by_mem'])
df['charge_rate'] = df['charge_rate'] * df['queue'].apply(lambda x: charge_rates.get(x, 2))
df['su'] = df['charge_rate'] * df['wall_hours']
df['user'] = df['Job_Owner'].apply(lambda x: x.split('@')[0])
df['name'] = df['Job_Name']
if 'comment' not in df:
df['comment'] = None
table = df[['project','user','name','queue','state','ncpus','walltime','su','mem_pct', 'cpu_pct', 'qtime']]
if args.comment:
table = table.join(df['comment'])
adv_table = table.join(df[['charge_rate','ncpus_by_mem','mem_request','mem_used']])
if args.format == 'table':
pct_format = lambda x: f'{x:.0f}%'
round_format = lambda x: f'{x:.0f}'
table.to_string(sys.stdout, formatters={'su': round_format, 'mem_pct': pct_format, 'cpu_pct': pct_format})
print()
elif args.format == 'csv':
adv_table.to_csv(sys.stdout, index_label='jobid')
elif args.format == 'json':
adv_table.to_json(sys.stdout, orient='index', indent=4, date_unit='s')
else:
raise NotImplementedError
if __name__ == '__main__':
main()