Skip to content
This repository has been archived by the owner on May 8, 2022. It is now read-only.

Commit

Permalink
workflow/bot operations API- Add list runs api
Browse files Browse the repository at this point in the history
  • Loading branch information
harshach committed Nov 9, 2021
1 parent dfc191b commit 82a9426
Showing 1 changed file with 44 additions and 9 deletions.
53 changes: 44 additions & 9 deletions src/plugins/openmetadata_airflow_apis_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ def import_path(path):
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True}
]
},
{
"name": "list_run",
"description": "Get a list of a dag runs",
"http_method": "GET",
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True}
]
},
{
"name": "dag_state",
"description": "Get the status of a dag run",
Expand Down Expand Up @@ -372,6 +380,8 @@ def api(self):
# Some functions are custom and need to be manually routed to.
if api == "deploy_dag":
final_response = self.deploy_dag()
elif api == 'list_run':
final_response = self.list_run()
elif api == 'trigger_dag':
final_response = self.trigger_dag()
elif api == "refresh_all_dags":
Expand Down Expand Up @@ -402,8 +412,6 @@ def deploy_dag(self):
args:
workflow_config: the workflow config that defines the dag
"""

logging.info("Executing custom 'deploy_dag' function")
request_json = request.get_json()
workflow_config = WorkflowConfig(**request_json)
workflow_config.pythonOperatorLocation = dag_managed_operators
Expand Down Expand Up @@ -457,6 +465,7 @@ def deploy_dag(self):
warning = "Failed to get dag from dag_file"
logging.warning(warning)
return ApiResponse.server_error("Failed to get dag from DAG File [{}]".format(dag_file))

dag_id = dag_name
logging.info("dag_id from file: " + dag_id)
# Refresh dag into session
Expand All @@ -472,13 +481,14 @@ def deploy_dag(self):
dag_model = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
logging.info("dag_model:" + str(dag_model))
dag_model.set_is_paused(is_paused=pause)
return ApiResponse.success({
"message": "Workflow [{}] has been created".format(dag_name)
})
except Exception as e:
logging.info(f'Failed to serialize the dag {e}')


return ApiResponse.success({
"message": "Workflow [{}] has been created".format(dag_name)
})
return ApiResponse.server_error({
"message": "Workflow [{}] failed to deploy due to [{}]".format(dag_name, e)
})

@staticmethod
def trigger_dag():
Expand All @@ -504,8 +514,6 @@ def trigger_dag():
})




@staticmethod
def refresh_all_dags():
"""Custom Function for the refresh_all_dags API.
Expand Down Expand Up @@ -560,6 +568,33 @@ def delete_dag(self):
"message": "DAG [{}] deleted".format(dag_id)
})

def list_run(self):
"""
List dag runs
"""
try:
logging.info("Running list_run")
dag_id = self.get_argument(request, 'dag_id')
logging.info("dag_id {}".format(dag_id))
session = settings.Session()
query = session.query(DagRun)
dag_runs = query.filter(
DagRun.dag_id == dag_id
).all()

if dag_runs is None:
return ApiResponse.not_found("dag run is not found")

res_dag_runs = []
for dag_run in dag_runs:
res_dag_runs.append(ResponseFormat.format_dag_run_state(dag_run))
session.close()
response_dict = {'dag_runs': res_dag_runs}
return ApiResponse.success(response_dict)
except Exception as e:
logging.error("Failed to list dag runs {}".format(e))
return ApiResponse.server_error("Failed to list dag runs for {}".format(dag_id))


def dag_state(self):
"""Get dag_run from session according to dag_id and run_id,
Expand Down

0 comments on commit 82a9426

Please sign in to comment.