-
Notifications
You must be signed in to change notification settings - Fork 5
/
dag_status.py
executable file
·37 lines (30 loc) · 921 Bytes
/
dag_status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
import time
import requests
def get_latest_state():
url = 'http://localhost:8080/api/v1/dags/lakeFS_workflow/dagRuns'
username = 'admin'
password = 'admin'
response = requests.get(url, auth=(username, password))
response.raise_for_status()
dag_runs = response.json()['dag_runs']
latest = max(dag_runs, key=lambda k: k['execution_date'])
state = latest['state']
return state
def dag_state():
timeout = time.time() + 60 * 5 # five minutes from now
interval = 5 # five seconds
while True:
state = get_latest_state()
if state == 'success' or state == 'failed' or state == 'skipped':
break
if time.time()+interval > timeout:
break
time.sleep(interval)
print("dag_state", state)
if state != 'success':
return 1
return 0
sys.exit(dag_state())