Skip to content
This repository has been archived by the owner on May 8, 2022. It is now read-only.

Commit

Permalink
Add trigger api
Browse files Browse the repository at this point in the history
  • Loading branch information
harshach committed Nov 8, 2021
1 parent 54e515b commit dfc191b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 0 deletions.
47 changes: 47 additions & 0 deletions examples/sample-data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"name": "om_metadata_sample_data",
"forceDeploy": "true",
"pauseWorkflow": "false",
"description": "this is an example dag!",
"concurrency": 1,
"maxActiveRuns": 1,
"workflowTimeout": 60,
"workflowDefaultView": "tree",
"orientation": "LR",
"owner": "harsha",
"startDate": "2021-10-31T15:00:00.000Z",
"endDate": "2021-11-05T00:00:00.000Z",
"retries": 1,
"retryDelay": 300,
"schedule_interval": "0 3 * * *",
"tasks": [{
"name": "task_1",
"operator": "airflow.operators.python_operator.PythonOperator",
"config": {
"python_callable_name": "metadata_ingestion_workflow",
"python_callable_file": "metadata_ingestion.py",
"op_kwargs": {
"workflow_config": {
"source": {
"type": "sample-data",
"config": {
"sample_data_folder": "/Users/harsha/Code/openmetadata/ingestion/examples/sample_data"
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
}
}
}
},
"dependencies": []
}]
}
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def get_long_description():
"setuptools~=58.3.0",
"apache-airflow[http,kubernetes]>=1.10.2",
"Flask~=1.1.4",
"Flask-Admin",
"pydantic>=1.7.4",
}

Expand Down
39 changes: 39 additions & 0 deletions src/plugins/openmetadata_airflow_apis_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from airflow.utils.state import State
from airflow.utils import timezone
from airflow.exceptions import TaskNotFound
from airflow.api.common.experimental.trigger_dag import trigger_dag

from flask import Blueprint, request, jsonify, Response
from flask_admin import BaseView as AdminBaseview, expose as admin_expose
Expand Down Expand Up @@ -70,6 +71,16 @@ def import_path(path):
"required": True},
]
},
{
"name": "trigger_dag",
"description": "Trigger a DAG",
"http_method": "POST",
"arguments": [],
"post_arguments": [
{"name": "workflow_name", "description": "Workflow name to run",
"required": True},
]
},
{
"name": "refresh_all_dags",
"description": "Refresh all DAGs in the Web Server",
Expand Down Expand Up @@ -361,6 +372,8 @@ def api(self):
# Some functions are custom and need to be manually routed to.
if api == "deploy_dag":
final_response = self.deploy_dag()
elif api == 'trigger_dag':
final_response = self.trigger_dag()
elif api == "refresh_all_dags":
final_response = self.refresh_all_dags()
elif api == "delete_dag":
Expand Down Expand Up @@ -467,6 +480,32 @@ def deploy_dag(self):
"message": "Workflow [{}] has been created".format(dag_name)
})

@staticmethod
def trigger_dag():
"""
Trigger a dag run
"""
logging.info("Running run_dag method")
try:
request_json = request.get_json()
dag_id = request_json['workflow_name']
run_id = request_json['run_id'] if 'run_id' in request_json.keys() else None
dag_run = trigger_dag(dag_id=dag_id,
run_id=run_id,
conf=None,
execution_date=timezone.utcnow())
return ApiResponse.success({
"message": "Workflow [{}] has been triggered {}".format(dag_id, dag_run)
})
except Exception as e:
logging.info(f"Failed to trigger dag {dag_id}")
return ApiResponse.error({
"message": "Workflow {} has filed to trigger due to {}".format(dag_id, e)
})




@staticmethod
def refresh_all_dags():
"""Custom Function for the refresh_all_dags API.
Expand Down

0 comments on commit dfc191b

Please sign in to comment.