-
Notifications
You must be signed in to change notification settings - Fork 22
/
watch-terra
executable file
·208 lines (167 loc) · 6.43 KB
/
watch-terra
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#!/usr/bin/env python
import subprocess
from typing import Dict, Tuple
from dataclasses import dataclass
import firecloud.api
import time
import requests
import json
import click
import re
#####################
# CMD = '''
# on run argv
# display notification (item 2 of argv) with title (item 1 of argv)
# end run
# '''
# def notify_mac(title, text):
# subprocess.call(['osascript', '-e', CMD, title, text])
######################
RunningStatus = "Running"
FailedStatus = "Failed"
SucceededStatus = "Succeeded"
AbortedStatus = "Aborted"
KNOWN_STATUSES = {RunningStatus, FailedStatus, SucceededStatus, AbortedStatus}
@dataclass
class NewSubmission:
submission_id: str
@dataclass
class SubmissionComplete:
submission_id: str
statuses: Dict[str, int]
@dataclass
class SubmissionStatusChange:
submission_id: str
prev_workflow_status: Dict[str, int]
new_workflow_status: Dict[str, int]
status_change: Dict[str, Tuple[int, int]]
def __str__(self):
buf = []
for change in self.status_change.keys():
prev, now = self.status_change[change]
buf.append(f"{change}: {prev} -> {now}")
return "\n".join(buf)
# @dataclass
# class Submission:
# id: str
# method: str
# user_comment: str
# submission_date: str
# def add_submissions(submissions_by_id, json_submissions):
# for s in json_submissions:
# submissions_by_id["submissionId"] = Submission(
# id=s["submissionId"],
# method=f"{s['methodConfigurationNamespace']}/{s['methodConfigurationName']}",
# user_comment=s.get('userComment'),
# submission_date=s['submissionDate'])
def diff(orig_submissions, latest_submissions):
by_id = {}
for s in orig_submissions:
by_id[s["submissionId"]] = s["workflowStatuses"]
for s in latest_submissions:
if s["submissionId"] in by_id:
b = s["workflowStatuses"]
a = by_id[s["submissionId"]]
changes = {}
for status in set(a.keys()).union(b.keys()):
assert status in KNOWN_STATUSES
prev_value = a.get(status, 0)
value = b.get(status, 0)
if prev_value != value:
changes[status] = (prev_value, value)
if len(changes) > 0:
if RunningStatus not in s["workflowStatuses"]:
yield SubmissionComplete(
submission_id=s["submissionId"], statuses=s["workflowStatuses"]
)
else:
yield SubmissionStatusChange(
submission_id=s["workflowStatuses"],
prev_workflow_status=a,
new_workflow_status=b,
status_change=changes,
)
else:
yield NewSubmission(s["submissionId"])
def get_slack_webhook_url():
# fetch the URL from google secrets management. The URL has a key in it so we cannot add this to the repo.
as_bytes = subprocess.check_output(
[
"gcloud",
"secrets",
"versions",
"access",
"latest",
"--secret",
"terra-notifier-webhook-url",
"--project",
"depmap-omics",
]
)
return as_bytes.decode("utf8")
def poll_workspace(workspaces, notifier):
# submissions_by_id = {}
prev_submissions_by_name = {}
while True:
for workspace in workspaces:
prev_submissions = prev_submissions_by_name.get(workspace)
namespace, name = _split_workspace_name(workspace)
submissions = firecloud.api.list_submissions(namespace, name)
assert (
submissions.status_code == 200
), f"expected 200 but got status={submissions.status_code}, body={submissions.content}"
submissions = submissions.json()
# add_submissions(submissions_by_id, submissions)
if prev_submissions is not None:
for x in diff(prev_submissions, submissions):
if (
isinstance(x, SubmissionStatusChange)
and len(x.status_change) == 2
and "Running" in x.status_change
and "Succeeded" in x.status_change
):
# reporting that only the running count and the succeeded counts changed -- which implies
# the very common case of just more jobs successfully completing. We don't need notification
# of that.
print("c", end="", flush=True)
else:
notifier(f"Changes in {workspace}", str(x))
print(".", end="", flush=True)
prev_submissions_by_name[workspace] = submissions
time.sleep(30)
def _split_workspace_name(workspace):
m = re.match("([^/]+)/(.+)", workspace)
assert (
m
), f"Expected workspace name to be of the format 'namespace/name' but got: {workspace}"
namespace, workspace = m.groups()
return namespace, workspace
@click.command()
@click.argument("workspaces", nargs=-1)
def main(workspaces):
"""
Polls the supplied workspace for changes in submission status. When changes
such as a new submission or a submission completes, or part of a submission fails, a
notification will be sent out on a slack channel.
The slack channel notifications will be written is controlled by a secret stored in
google secrets management (depmap-omics/terra-notifier-webhook-url). At this
time this is hardcoded to "#terra-notification-test".
Workspace should be of the form <namespace>/<name> (ie: broad-firecloud-ccle/DepMap_WES_CN_hg38)
When run, expect to see a "." for each time it checks Terra and occasional a "c" when jobs are completed.
The command will run indefinitely until killed.
"""
webhook_url = get_slack_webhook_url()
def notify_slack(title, text):
body = f"{title}\n{text}"
print(f"Sending notification: {body}")
resp = requests.post(
webhook_url,
data=json.dumps({"text": body}),
headers={"Content-type": "application/json"},
)
assert (
resp.status_code == 200
), f"status: {resp.status_code}, body={resp.content}"
poll_workspace(workspaces, notify_slack)
if __name__ == "__main__":
main()