diff --git a/envs/api-s3-dataset/.gitkeep b/envs/api-s3-dataset/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/envs/api-s3-dataset/.whirl.env b/envs/api-s3-dataset/.whirl.env new file mode 100644 index 0000000..b9d973e --- /dev/null +++ b/envs/api-s3-dataset/.whirl.env @@ -0,0 +1,21 @@ +AWS_ACCESS_KEY_ID=bar +AWS_SECRET_ACCESS_KEY=foo +DEMO_BUCKET=demo-s3-output +AWS_SERVER=s3server +AWS_PORT=4566 + +# postgres env vars +POSTGRES_HOST=postgresdb +POSTGRES_PORT=5432 +POSTGRES_PASSWORD=pAssw0rd +POSTGRES_USER=airflow +POSTGRES_DB=airflow + +# Airflow variables +AIRFLOW__CORE__EXPOSE_CONFIG=True +AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True +AIRFLOW__WEBSERVER__SECRET_KEY=foobar +AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False +AIRFLOW__CORE__LOAD_EXAMPLES=False +AIRFLOW__CORE__FERNET_KEY=W5gmA+dp84hkZEzpxPw4LTmhbXA1uVxKZsgIfay8wno= +AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB} diff --git a/envs/api-s3-dataset/docker-compose.yml b/envs/api-s3-dataset/docker-compose.yml new file mode 100644 index 0000000..a47f36f --- /dev/null +++ b/envs/api-s3-dataset/docker-compose.yml @@ -0,0 +1,83 @@ +version: '3' + +services: + webserver: + image: docker-whirl-airflow:py-${PYTHON_VERSION}-local + command: ["webserver", "-p", "5000"] + ports: + - '5000:5000' # HTTP (Airflow Web UI) + env_file: + - .whirl.env + environment: + - WHIRL_SETUP_FOLDER + - AIRFLOW__API__AUTH_BACKEND + volumes: + - ${DAG_FOLDER}:/opt/airflow/dags + - ${ENVIRONMENT_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/env.d/ + - ${DAG_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/dag.d/ + depends_on: + - mockserver + - postgresdb + - s3server + links: + - s3server:${DEMO_BUCKET}.s3server + + scheduler: + image: docker-whirl-airflow:py-${PYTHON_VERSION}-local + command: ["scheduler"] + env_file: + - .whirl.env + environment: + - WHIRL_SETUP_FOLDER + - AIRFLOW__API__AUTH_BACKEND + volumes: + - ${DAG_FOLDER}:/opt/airflow/dags + - ${ENVIRONMENT_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/env.d/ + - ${DAG_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/dag.d/ + depends_on: + - mockserver + - postgresdb + - s3server + links: + - s3server:${DEMO_BUCKET}.s3server + + triggerer: + image: docker-whirl-airflow:py-${PYTHON_VERSION}-local + command: ["triggerer"] + env_file: + - .whirl.env + environment: + - WHIRL_SETUP_FOLDER + - AIRFLOW__API__AUTH_BACKEND + volumes: + - ${DAG_FOLDER}:/opt/airflow/dags + - ${ENVIRONMENT_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/env.d/ + - ${DAG_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/dag.d/ + depends_on: + - postgresdb + + mockserver: + image: mockserver/mockserver:5.15.0 + ports: + - 1080:1080 + - 1081:1081 + environment: + - LOG_LEVEL=ERROR + - SERVER_PORT=1080,1081 + + postgresdb: + image: postgres:16 + ports: + - 5432:5432 + env_file: + - .whirl.env + + s3server: + image: localstack/localstack:3.0.2 + ports: + - "4566:4566" + environment: + - SERVICES=s3 + env_file: + - .whirl.env + diff --git a/envs/api-s3-dataset/whirl.setup.d/01_add_connection_api.sh b/envs/api-s3-dataset/whirl.setup.d/01_add_connection_api.sh new file mode 100644 index 0000000..c754ca3 --- /dev/null +++ b/envs/api-s3-dataset/whirl.setup.d/01_add_connection_api.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash + +echo "=============================" +echo "== Configure S3 Connection ==" +echo "=============================" +airflow connections add \ + local_s3 \ + --conn-type s3 \ + --conn-extra "{\"endpoint_url\": \"http://${AWS_SERVER}:${AWS_PORT}\", + \"aws_secret_access_key\": \"${AWS_SECRET_ACCESS_KEY}\", + \"aws_access_key_id\": \"${AWS_ACCESS_KEY_ID}\", + \"host\": \"http://${AWS_SERVER}:${AWS_PORT}\" + }" + +pip install awscli awscli-plugin-endpoint + +echo -e "$AWS_ACCESS_KEY_ID\n$AWS_SECRET_ACCESS_KEY\n\n" | aws configure +aws configure set plugins.endpoint awscli_plugin_endpoint +aws configure set default.s3.endpoint_url http://${AWS_SERVER}:${AWS_PORT} +aws configure set default.s3api.endpoint_url http://${AWS_SERVER}:${AWS_PORT} + +echo "================================" +echo "== Create S3 Bucket ===========" +echo "================================" +while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://${AWS_SERVER}:${AWS_PORT})" != "200" ]]; do + echo "Waiting for ${AWS_SERVER} to come up..." + sleep 2; +done + +echo "creating bucket" +aws s3api create-bucket --bucket ${DEMO_BUCKET} diff --git a/examples/airflow-datasets/.airflowignore b/examples/airflow-datasets/.airflowignore new file mode 100644 index 0000000..482804b --- /dev/null +++ b/examples/airflow-datasets/.airflowignore @@ -0,0 +1,3 @@ +whirl.setup.d +.whirl.env +README.md diff --git a/examples/airflow-datasets/.whirl.env b/examples/airflow-datasets/.whirl.env new file mode 100644 index 0000000..393accb --- /dev/null +++ b/examples/airflow-datasets/.whirl.env @@ -0,0 +1 @@ +WHIRL_ENVIRONMENT=api-s3-dataset diff --git a/examples/airflow-datasets/README.md b/examples/airflow-datasets/README.md new file mode 100644 index 0000000..d01a3b4 --- /dev/null +++ b/examples/airflow-datasets/README.md @@ -0,0 +1,40 @@ +#### Dataset Aware Scheduling Example + +In this example we are going to have multiple dags which depend on each other through the dataset aware scheduling. + +The default environment (`api-s3-dataset`) includes containers for: + + - A S3 server; + - A MockServer instance + - The core Airflow components (webserver, scheduler, triggerer). + - The airflow database (postgres) + +The environment contains a setup script in the `whirl.setup.d/` folder: + + - `01_add_connection_api.sh` which: + + - Adds a S3 connection to Airflow; + - Installs the `awscli` Python libraries and configures them to connect to the S3 server; + - Creates a bucket (with a `/etc/hosts` entry to support the [virtual host style method](https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html)). + +To run this example with the default environment: + +```bash +$ cd ./examples/airflow-datasets +$ whirl +``` + +Open your browser to [http://localhost:5000](http://localhost:5000) to access the Airflow UI. Manually enable the DAG and watch the pipeline run to successful completion. + +This example includes a `.whirl.env` configuration file in the DAG directory. In the environment folder there is also a `.whirl.env` which specifies S3-specific variables. The example folder also contains a `whirl.setup.d/` directory which contains an initialization script (`01_add_connection_api_and_mockdata.sh`). This script is executed in the airflow containers after the environment-specific scripts have run and will: + + - Add a connection to the API endpoint; + - Add an [expectation](http://www.mock-server.com/mock_server/creating_expectations.html) for the MockServer to know which response needs to be sent for which requested path; + - Install Pandas and PyArrow to support transforming the JSON into a Parquet file; + - Create a local directory where the intermediate file is stored before being uploaded to S3. + +For the DAGs we wanted to test different scenarios + +##### Simple single dataset dependency + + diff --git a/examples/airflow-datasets/dataset-A-producer.py b/examples/airflow-datasets/dataset-A-producer.py new file mode 100644 index 0000000..24936e4 --- /dev/null +++ b/examples/airflow-datasets/dataset-A-producer.py @@ -0,0 +1,29 @@ +from datetime import timedelta, datetime +from airflow import DAG + +from airflow.decorators import task + +from include.datasets import DEMO_A_DS + + +default_args = { + 'owner': 'whirl', + 'depends_on_past': False, + 'start_date': datetime.now() - timedelta(minutes=20), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + dag_id='dataset-A-producer', + default_args=default_args, + schedule=None, + dagrun_timeout=timedelta(seconds=120) +): + + @task(outlets=[DEMO_A_DS]) + def trigger_dataset(**context): + print(f"Triggering dataset: {DEMO_A_DS.uri}") + pass + + trigger_dataset() diff --git a/examples/airflow-datasets/dataset-ABC-consumer.py b/examples/airflow-datasets/dataset-ABC-consumer.py new file mode 100644 index 0000000..e644b27 --- /dev/null +++ b/examples/airflow-datasets/dataset-ABC-consumer.py @@ -0,0 +1,31 @@ +from datetime import timedelta, datetime +from airflow import DAG + +from airflow.decorators import task +from airflow.hooks.S3_hook import S3Hook + +from include.datasets import DEMO_A_DS, DEMO_B_DS, DEMO_C_DS + + +default_args = { + 'owner': 'whirl', + 'depends_on_past': False, + 'start_date': datetime.now() - timedelta(minutes=20), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + dag_id='dataset-ABC-consumer', + default_args=default_args, + schedule=[DEMO_A_DS, DEMO_B_DS, DEMO_C_DS], + dagrun_timeout=timedelta(seconds=120) +): + + @task + def echo_trigger(triggering_dataset_events=None): + for dataset, dataset_list in triggering_dataset_events.items(): + print(dataset, dataset_list) + print(dataset_list[0].source_dag_run.dag_id) + + echo_trigger() diff --git a/examples/airflow-datasets/dataset-ABCD-consumer.py b/examples/airflow-datasets/dataset-ABCD-consumer.py new file mode 100644 index 0000000..e8652b4 --- /dev/null +++ b/examples/airflow-datasets/dataset-ABCD-consumer.py @@ -0,0 +1,31 @@ +from datetime import timedelta, datetime +from airflow import DAG + +from airflow.decorators import task +from airflow.hooks.S3_hook import S3Hook + +from include.datasets import DEMO_A_DS, DEMO_B_DS, DEMO_C_DS, DEMO_D_DS + + +default_args = { + 'owner': 'whirl', + 'depends_on_past': False, + 'start_date': datetime.now() - timedelta(minutes=20), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + dag_id='dataset-ABCD-consumer', + default_args=default_args, + schedule=[DEMO_A_DS, DEMO_B_DS, DEMO_C_DS, DEMO_D_DS], + dagrun_timeout=timedelta(seconds=120) +): + + @task + def echo_trigger(triggering_dataset_events=None): + for dataset, dataset_list in triggering_dataset_events.items(): + print(dataset, dataset_list) + print(dataset_list[0].source_dag_run.dag_id) + + echo_trigger() diff --git a/examples/airflow-datasets/dataset-B-producer.py b/examples/airflow-datasets/dataset-B-producer.py new file mode 100644 index 0000000..dcf48f6 --- /dev/null +++ b/examples/airflow-datasets/dataset-B-producer.py @@ -0,0 +1,29 @@ +from datetime import timedelta, datetime +from airflow import DAG + +from airflow.decorators import task + +from include.datasets import DEMO_B_DS + + +default_args = { + 'owner': 'whirl', + 'depends_on_past': False, + 'start_date': datetime.now() - timedelta(minutes=20), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + dag_id='dataset-B-producer', + default_args=default_args, + schedule=None, + dagrun_timeout=timedelta(seconds=120) +): + + @task(outlets=[DEMO_B_DS]) + def trigger_dataset(**context): + print(f"Triggering dataset: {DEMO_B_DS.uri}") + pass + + trigger_dataset() diff --git a/examples/airflow-datasets/dataset-C-producer.py b/examples/airflow-datasets/dataset-C-producer.py new file mode 100644 index 0000000..921d538 --- /dev/null +++ b/examples/airflow-datasets/dataset-C-producer.py @@ -0,0 +1,29 @@ +from datetime import timedelta, datetime +from airflow import DAG + +from airflow.decorators import task + +from include.datasets import DEMO_C_DS + + +default_args = { + 'owner': 'whirl', + 'depends_on_past': False, + 'start_date': datetime.now() - timedelta(minutes=20), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + dag_id='dataset-C-producer', + default_args=default_args, + schedule=None, + dagrun_timeout=timedelta(seconds=120) +): + + @task(outlets=[DEMO_C_DS]) + def trigger_dataset(**context): + print(f"Triggering dataset: {DEMO_C_DS.uri}") + pass + + trigger_dataset() diff --git a/examples/airflow-datasets/dataset-D-producer.py b/examples/airflow-datasets/dataset-D-producer.py new file mode 100644 index 0000000..09f6a04 --- /dev/null +++ b/examples/airflow-datasets/dataset-D-producer.py @@ -0,0 +1,29 @@ +from datetime import timedelta, datetime +from airflow import DAG + +from airflow.decorators import task + +from include.datasets import DEMO_D_DS + + +default_args = { + 'owner': 'whirl', + 'depends_on_past': False, + 'start_date': datetime.now() - timedelta(minutes=20), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + dag_id='dataset-D-producer', + default_args=default_args, + schedule=None, + dagrun_timeout=timedelta(seconds=120) +): + + @task(outlets=[DEMO_D_DS]) + def trigger_dataset(**context): + print(f"Triggering dataset: {DEMO_D_DS.uri}") + pass + + trigger_dataset() diff --git a/examples/airflow-datasets/dataset-E-producer.py b/examples/airflow-datasets/dataset-E-producer.py new file mode 100644 index 0000000..fbaee63 --- /dev/null +++ b/examples/airflow-datasets/dataset-E-producer.py @@ -0,0 +1,29 @@ +from datetime import timedelta, datetime +from airflow import DAG + +from airflow.decorators import task + +from include.datasets import DEMO_E_DS + + +default_args = { + 'owner': 'whirl', + 'depends_on_past': False, + 'start_date': datetime.now() - timedelta(minutes=20), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + dag_id='dataset-E-producer', + default_args=default_args, + schedule=None, + dagrun_timeout=timedelta(seconds=120) +): + + @task(outlets=[DEMO_E_DS]) + def trigger_dataset(**context): + print(f"Triggering dataset: {DEMO_E_DS.uri}") + pass + + trigger_dataset() diff --git a/examples/airflow-datasets/include/datasets.py b/examples/airflow-datasets/include/datasets.py new file mode 100644 index 0000000..c20628b --- /dev/null +++ b/examples/airflow-datasets/include/datasets.py @@ -0,0 +1,8 @@ +from airflow import Dataset + +DEMO_API_DS = Dataset("/tmp/datasets/demo-api.parquet") +DEMO_A_DS = Dataset("/tmp/datasets/a") +DEMO_B_DS = Dataset("/tmp/datasets/b") +DEMO_C_DS = Dataset("/tmp/datasets/c") +DEMO_D_DS = Dataset("/tmp/datasets/d") +DEMO_E_DS = Dataset("/tmp/datasets/e") diff --git a/examples/airflow-datasets/single-dataset-consumer.py b/examples/airflow-datasets/single-dataset-consumer.py new file mode 100644 index 0000000..fff077b --- /dev/null +++ b/examples/airflow-datasets/single-dataset-consumer.py @@ -0,0 +1,38 @@ +from datetime import timedelta, datetime +from airflow import DAG + +from airflow.decorators import task +from airflow.hooks.S3_hook import S3Hook + +from include.datasets import DEMO_API_DS + + +default_args = { + 'owner': 'whirl', + 'depends_on_past': False, + 'start_date': datetime.now() - timedelta(minutes=20), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + dag_id='single-dataset-consumer', + default_args=default_args, + schedule=[DEMO_API_DS], + dagrun_timeout=timedelta(seconds=120) +): + + @task() + def s3_store(conn_id, localfile, bucket, path, **context): + s3hook = S3Hook(conn_id) + s3hook.load_file( + localfile, + output_path, + bucket_name=bucket, + replace=True + ) + + s3bucket = "demo-s3-output" + output_path = "api-store/{{ ds_nodash }}/demo-api.parquet" + + s3_store(conn_id="local_s3", localfile=DEMO_API_DS.uri, bucket=s3bucket, path=output_path) diff --git a/examples/airflow-datasets/single-dataset-producer.py b/examples/airflow-datasets/single-dataset-producer.py new file mode 100644 index 0000000..eccd3eb --- /dev/null +++ b/examples/airflow-datasets/single-dataset-producer.py @@ -0,0 +1,40 @@ +from datetime import timedelta, datetime +from airflow import DAG + +from airflow.decorators import task +from airflow.hooks.http_hook import HttpHook +import pandas as pd + +from include.datasets import DEMO_API_DS + + +default_args = { + 'owner': 'whirl', + 'depends_on_past': False, + 'start_date': datetime.now() - timedelta(minutes=20), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + dag_id='single-dataset-producer', + default_args=default_args, + schedule='@hourly', + dagrun_timeout=timedelta(seconds=120) +): + + @task(outlets=[DEMO_API_DS]) + def api_get(conn_id, localfile, **context): + """ + Fetch data in json format and persist locally. + :param str conn_id: Airflow connection id for the API + :param dict templates_dict: Dictionary of variables templated by Airflow + :param context: Airflow context + :return: + """ + http_hook = HttpHook(http_conn_id=conn_id, method="GET") + response = http_hook.run('') + df = pd.DataFrame(response.json()) + df.to_parquet(localfile) + + api_get(conn_id="local_api", localfile=DEMO_API_DS.uri) diff --git a/examples/airflow-datasets/whirl.setup.d/01_add_connection_api_and_mockdata.sh b/examples/airflow-datasets/whirl.setup.d/01_add_connection_api_and_mockdata.sh new file mode 100644 index 0000000..3145fb6 --- /dev/null +++ b/examples/airflow-datasets/whirl.setup.d/01_add_connection_api_and_mockdata.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +echo "==============================" +echo "== Configure API Connection ==" +echo "==============================" + +airflow connections add local_api \ + --conn-type HTTP \ + --conn-host "http://mockserver:1080/testapi" \ + --conn-login apitest \ + --conn-password testapi + + +# Creating a expectation for our mockserver to respond to a specific api rest call with a fixed set of JSON data +# Fro docs on creating expectations see: http://www.mock-server.com/mock_server/creating_expectations.html +curl -v -X PUT "http://mockserver:1080/mockserver/expectation" -d '{ + "httpRequest": { + "path": "/testapi", + "headers": { + "Authorization": ["Basic .*"] + } + }, + "httpResponse": { + "statusCode": 200, + "headers": { + "content-type": [ + "application/json" + ] + }, + "body": { + "type": "JSON", + "json": "[{\"country\": \"Oman\", \"city\": \"Timaru\", \"lat\": 10.3398, \"lon\": -172.55031, \"dt\": 1578207382, \"type\": \"Acura\", \"color\": \"violet\"}, {\"country\": \"Denmark\", \"city\": \"La Cruz\", \"lat\": -44.10906, \"lon\": 2.4756, \"dt\": 1546852920, \"type\": \"Audi\", \"color\": \"orange\"}, {\"country\": \"Burundi\", \"city\": \"Pessac\", \"lat\": -68.89785, \"lon\": 4.89982, \"dt\": 1543912959, \"type\": \"BMW\", \"color\": \"red\"}, {\"country\": \"Aruba\", \"city\": \"Jemeppe-sur-Meuse\", \"lat\": -43.89882, \"lon\": -63.38649, \"dt\": 1520321854, \"type\": \"Peugeot\", \"color\": \"green\"}, {\"country\": \"Gambia\", \"city\": \"Pak Pattan\", \"lat\": -37.21457, \"lon\": -94.19453, \"dt\": 1570869730, \"type\": \"Daimler\", \"color\": \"yellow\"}]" + } + } +}' + +pip install pandas pyarrow + +mkdir -p /tmp/datasets/ \ No newline at end of file