This repo is deprecated. You can find new updates in the main OpenMetadata repo.
This is a plugin for Apache Airflow >= 1.10 and Airflow >=2.x that exposes REST APIs to deploy a workflow definition and manage DAGS and tasks.
Install following packages in your scheduler and webserver python env.
pip install openmetadata-airflow-apis
Add the following section to airflow.cfg
[openmetadata_airflow_apis]
dag_runner_template = {AIRFLOW_HOME}/dag_templates/dag_runner.j2
dag_generated_configs = {AIRFLOW_HOME}/dag_generated_configs
dag_managed_operators = {AIRFLOW_HOME}/dag_managed_operators
substitute AIRFLOW_HOME with your airflow installation home
- Download the latest release
- Create plugins folder in your scheduler and webserver if its not exist already
- cp -r src/plugins/* ${AIRFLOW_HOME}/plugins
- cp -r src/plugins/dag_templates {AIRFLOW_HOME}
- mkdir -p {AIRFLOW_HOME}/dag_generated_configs
- cp -r src/plugins/dag_managed_operators {AIRFLOW_HOME}
- (re)start the airflow webserver and scheduler
airflow webserver
airflow scheduler
Plugin enables JWT Token based authentication for Airflow versions 1.10.4 or higher when RBAC support is enabled.
curl -XPOST http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"username", "password":"password", "refresh":true, "provider": "db"}'
curl -X POST http://localhost:8080/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"admin", "password":"admin", "refresh":true, "provider": "db"}'
{
"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiMTI4ZDE2OGQtMTZiOC00NzU0LWJiY2EtMTEyN2E2ZTNmZWRlIiwiZXhwIjoxNjA0MjE4NzM4LCJpZGVudGl0eSI6MSwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.xSWIE4lR-_0Qcu58OiSy-X0XBxuCd_59ic-9TB7cP9Y",
"refresh_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiZjA5NTNkODEtNWY4Ni00YjY0LThkMzAtYzg5NTYzMmFkMTkyIiwiZXhwIjoxNjA2ODA5ODM4LCJpZGVudGl0eSI6MSwidHlwZSI6InJlZnJlc2gifQ.VsiRr8_ulCoQ-3eAbcFz4dQm-y6732QR6OmYXsy4HLk"
}
By default, JWT access token is valid for 15 mins and refresh token is valid for 30 days. You can renew the access token with the help of refresh token as shown below.
curl -X POST "http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/security/refresh" -H 'Authorization: Bearer <refresh_token>'
curl -X POST "http://localhost:8080/api/v1/security/refresh" -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiZjA5NTNkODEtNWY4Ni00YjY0LThkMzAtYzg5NTYzMmFkMTkyIiwiZXhwIjoxNjA2ODA5ODM4LCJpZGVudGl0eSI6MSwidHlwZSI6InJlZnJlc2gifQ.VsiRr8_ulCoQ-3eAbcFz4dQm-y6732QR6OmYXsy4HLk'
{
"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyODQ2OTksIm5iZiI6MTYwNDI4NDY5OSwianRpIjoiZDhhN2IzMmYtMWE5Zi00Y2E5LWFhM2ItNDEwMmU3ZmMyMzliIiwiZXhwIjoxNjA0Mjg1NTk5LCJpZGVudGl0eSI6MSwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.qY2e-bNSgOY-YboinOoGqLfKX9aQkdRjo025mZwBadA"
}
{"msg":"Missing Authorization Header"}
Examples:
curl -X GET -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyODQ2OTksIm5iZiI6MTYwNDI4NDY5OSwianRpIjoiZDhhN2IzMmYtMWE5Zi00Y2E5LWFhM2ItNDEwMmU3ZmMyMzliIiwiZXhwIjoxNjA0Mjg1NTk5LCJpZGVudGl0eSI6MSwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.qY2e-bNSgOY-YboinOoGqLfKX9aQkdRjo025mZwBadA' http://localhost:8080/rest_api/api\?api\=dag_state\&dag_id\=dag_test\&run_id\=manual__2020-10-28T17%3A36%3A28.838356%2B00%3A00
Once you deploy the plugin and restart the webserver, you can start to use the REST API. Bellow you will see the endpoints that are supported.
Note: If enable RBAC, http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/
This web page will show the Endpoints supported and provide a form for you to test submitting to them.
- deploy_dag
- refresh_all_dags
- delete_dag
- dag_state
- task_instance_detail
- restart_failed_task
- kill_running_tasks
- run_task_instance
- skip_task_instance
- Deploy a new dag, and refresh dag to session.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/api?api=deploy_dag
- POST
{
"workflow": {
"name": "test_ingestion_x_35",
"force": "true",
"pause": "false",
"unpause": "true",
"dag_config": {
"test_ingestion_x_35": {
"default_args": {
"owner": "harsha",
"start_date": "2021-10-29T00:00:00.000Z",
"end_date": "2021-11-05T00:00:00.000Z",
"retries": 1,
"retry_delay_sec": 300
},
"schedule_interval": "0 3 * * *",
"concurrency": 1,
"max_active_runs": 1,
"dagrun_timeout_sec": 60,
"default_view": "tree",
"orientation": "LR",
"description": "this is an example dag!",
"tasks": {
"task_1": {
"operator": "airflow.operators.python_operator.PythonOperator",
"python_callable_name": "metadata_ingestion_workflow",
"python_callable_file": "metadata_ingestion.py",
"op_kwargs": {
"workflow_config": {
"metadata_server": {
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
},
"type": "metadata-server"
},
"sink": {
"config": {
"es_host": "localhost",
"es_port": 9200,
"index_dashboards": "true",
"index_tables": "true",
"index_topics": "true"
},
"type": "elasticsearch"
},
"source": {
"config": {
"include_dashboards": "true",
"include_tables": "true",
"include_topics": "true",
"limit_records": 10
},
"type": "metadata"
}
}
}
}
}
}
}
}
}
curl -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MzU2NTE1MDAsIm5iZiI6MTYzNTY1MTUwMCwianRpIjoiNWQyZTM3ZDYtNjdiYS00NGZmLThjOWYtMDM0ZTQyNGE3MTZiIiwiZXhwIjoxNjM1NjUyNDAwLCJpZGVudGl0eSI6MSwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.DRUYCAiMh5h2pk1MZZJ4asyVFC20pu35DuAANQ5GxGw' -H 'Content-Type: application/json' -d "@test_ingestion_config.json" -X POST http://localhost:8080/rest_api/api\?api\=deploy_dag```
##### response:
```json
{"message": "Workflow [test_ingestion_x_35] has been created", "status": "success"}
- Get all dags from dag_floder, refresh the dags to the session.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=refresh_all_dags
- GET
- None
curl -X GET http://localhost:8080/admin/rest_api/api?api=refresh_all_dags
{
"message": "All DAGs are now up to date",
"status": "success"
}
- Delete dag based on dag_id.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=delete_dag&dag_id=value
- GET
- dag_id - string - The id of dag.
curl -X GET http://localhost:8080/admin/rest_api/api?api=delete_dag&dag_id=dag_test
{
"message": "DAG [dag_test] deleted",
"status": "success"
}
- Get the status of a dag run.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=dag_state&dag_id=value&run_id=value
- GET
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
curl -X GET http://localhost:8080/admin/rest_api/api?api=dag_state&dag_id=dag_test&run_id=manual__2020-10-28T16%3A15%3A19.427214%2B00%3A00
{
"state": "success",
"startDate": "2020-10-28T16:15:19.436693+0000",
"endDate": "2020-10-28T16:21:36.245696+0000",
"status": "success"
}
- Get the detail info of a task instance.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=task_instance_detail&dag_id=value&run_id=value&task_id=value
- GET
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
- task_id - string - The id of the task.
curl -X GET http://localhost:8080/admin/rest_api/api?api=task_instance_detail&dag_id=dag_test&run_id=manual__2020-10-28T16%3A31%3A17.247035%2B00%3A00&task_id=task_test
{
"taskId": "task_test",
"dagId": "dag_test",
"state": "success",
"tryNumber": null,
"maxTries": null,
"startDate": "2020-10-28T16:31:57.882329+0000",
"endDate": "2020-10-28T16:31:57.882329+0000",
"duration": null,
"status": "success"
}
- Restart failed tasks with downstream.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=restart_failed_task&dag_id=value&run_id=value
- GET
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
curl -X GET http://localhost:8080/admin/rest_api/api?api=restart_failed_task&dag_id=dag_test&run_id=manual__2020-10-28T16%3A31%3A17.247035%2B00%3A00
{
"failed_task_count": 2,
"clear_task_count": 6,
"status": "success"
}
- Kill running tasks that status in ['none', 'running'].
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=kill_running_tasks&dag_id=value&run_id=value&task_id=value
- GET
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
- task_id - string - If task_id is none, kill all tasks, else kill one task.
curl -X GET http://localhost:8080/admin/rest_api/api?api=kill_running_tasks&dag_id=dag_test&run_id=manual__2020-10-28T16%3A31%3A17.247035%2B00%3A00&task_id=task_test
{
"status": "success"
}
- Create dagRun, and run some tasks, other task skip.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=run_task_instance
- POST
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
- tasks - string - The id of the tasks, Multiple tasks are split by comma.
- conf - string - Conf of creating dagRun.
curl -X POST -F 'dag_id=dag_test' -F 'run_id=manual__2020-10-28T17:36:28.838356+00:00' -F 'tasks=task_test_3,task_test_4,task_test_6' http://localhost:8080/admin/rest_api/api?api=run_task_instance
{
"execution_date": "2020-10-28T17:39:14.941060+0000",
"status": "success"
}
- Skip one task instance and downstream task.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=skip_task_instance&dag_id=value&run_id=value&task_id=value
- GET
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
- task_id - string - The id of the task.
curl -X GET http://localhost:8080/admin/rest_api/api?api=skip_task_instance&dag_id=dag_test&run_id=manual__2020-10-28T17%3A43%3A10.053716%2B00%3A00&task_id=task_test_2
{
"status": "success"
}