Skip to content

Commit

Permalink
First draft of a dataset aware scheduling example
Browse files Browse the repository at this point in the history
  • Loading branch information
krisgeus committed Jan 24, 2024
1 parent cddeb66 commit 23fddbb
Show file tree
Hide file tree
Showing 18 changed files with 511 additions and 0 deletions.
Empty file added envs/api-s3-dataset/.gitkeep
Empty file.
21 changes: 21 additions & 0 deletions envs/api-s3-dataset/.whirl.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
AWS_ACCESS_KEY_ID=bar
AWS_SECRET_ACCESS_KEY=foo
DEMO_BUCKET=demo-s3-output
AWS_SERVER=s3server
AWS_PORT=4566

# postgres env vars
POSTGRES_HOST=postgresdb
POSTGRES_PORT=5432
POSTGRES_PASSWORD=pAssw0rd
POSTGRES_USER=airflow
POSTGRES_DB=airflow

# Airflow variables
AIRFLOW__CORE__EXPOSE_CONFIG=True
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
AIRFLOW__WEBSERVER__SECRET_KEY=foobar
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__CORE__FERNET_KEY=W5gmA+dp84hkZEzpxPw4LTmhbXA1uVxKZsgIfay8wno=
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
83 changes: 83 additions & 0 deletions envs/api-s3-dataset/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
version: '3'

services:
webserver:
image: docker-whirl-airflow:py-${PYTHON_VERSION}-local
command: ["webserver", "-p", "5000"]
ports:
- '5000:5000' # HTTP (Airflow Web UI)
env_file:
- .whirl.env
environment:
- WHIRL_SETUP_FOLDER
- AIRFLOW__API__AUTH_BACKEND
volumes:
- ${DAG_FOLDER}:/opt/airflow/dags
- ${ENVIRONMENT_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/env.d/
- ${DAG_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/dag.d/
depends_on:
- mockserver
- postgresdb
- s3server
links:
- s3server:${DEMO_BUCKET}.s3server

scheduler:
image: docker-whirl-airflow:py-${PYTHON_VERSION}-local
command: ["scheduler"]
env_file:
- .whirl.env
environment:
- WHIRL_SETUP_FOLDER
- AIRFLOW__API__AUTH_BACKEND
volumes:
- ${DAG_FOLDER}:/opt/airflow/dags
- ${ENVIRONMENT_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/env.d/
- ${DAG_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/dag.d/
depends_on:
- mockserver
- postgresdb
- s3server
links:
- s3server:${DEMO_BUCKET}.s3server

triggerer:
image: docker-whirl-airflow:py-${PYTHON_VERSION}-local
command: ["triggerer"]
env_file:
- .whirl.env
environment:
- WHIRL_SETUP_FOLDER
- AIRFLOW__API__AUTH_BACKEND
volumes:
- ${DAG_FOLDER}:/opt/airflow/dags
- ${ENVIRONMENT_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/env.d/
- ${DAG_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/dag.d/
depends_on:
- postgresdb

mockserver:
image: mockserver/mockserver:5.15.0
ports:
- 1080:1080
- 1081:1081
environment:
- LOG_LEVEL=ERROR
- SERVER_PORT=1080,1081

postgresdb:
image: postgres:16
ports:
- 5432:5432
env_file:
- .whirl.env

s3server:
image: localstack/localstack:3.0.2
ports:
- "4566:4566"
environment:
- SERVICES=s3
env_file:
- .whirl.env

31 changes: 31 additions & 0 deletions envs/api-s3-dataset/whirl.setup.d/01_add_connection_api.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env bash

echo "============================="
echo "== Configure S3 Connection =="
echo "============================="
airflow connections add \
local_s3 \
--conn-type s3 \
--conn-extra "{\"endpoint_url\": \"http://${AWS_SERVER}:${AWS_PORT}\",
\"aws_secret_access_key\": \"${AWS_SECRET_ACCESS_KEY}\",
\"aws_access_key_id\": \"${AWS_ACCESS_KEY_ID}\",
\"host\": \"http://${AWS_SERVER}:${AWS_PORT}\"
}"

pip install awscli awscli-plugin-endpoint

echo -e "$AWS_ACCESS_KEY_ID\n$AWS_SECRET_ACCESS_KEY\n\n" | aws configure
aws configure set plugins.endpoint awscli_plugin_endpoint
aws configure set default.s3.endpoint_url http://${AWS_SERVER}:${AWS_PORT}
aws configure set default.s3api.endpoint_url http://${AWS_SERVER}:${AWS_PORT}

echo "================================"
echo "== Create S3 Bucket ==========="
echo "================================"
while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://${AWS_SERVER}:${AWS_PORT})" != "200" ]]; do
echo "Waiting for ${AWS_SERVER} to come up..."
sleep 2;
done

echo "creating bucket"
aws s3api create-bucket --bucket ${DEMO_BUCKET}
3 changes: 3 additions & 0 deletions examples/airflow-datasets/.airflowignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
whirl.setup.d
.whirl.env
README.md
1 change: 1 addition & 0 deletions examples/airflow-datasets/.whirl.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
WHIRL_ENVIRONMENT=api-s3-dataset
40 changes: 40 additions & 0 deletions examples/airflow-datasets/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#### Dataset Aware Scheduling Example

In this example we are going to have multiple dags which depend on each other through the dataset aware scheduling.

The default environment (`api-s3-dataset`) includes containers for:

- A S3 server;
- A MockServer instance
- The core Airflow components (webserver, scheduler, triggerer).
- The airflow database (postgres)

The environment contains a setup script in the `whirl.setup.d/` folder:

- `01_add_connection_api.sh` which:

- Adds a S3 connection to Airflow;
- Installs the `awscli` Python libraries and configures them to connect to the S3 server;
- Creates a bucket (with a `/etc/hosts` entry to support the [virtual host style method](https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html)).

To run this example with the default environment:

```bash
$ cd ./examples/airflow-datasets
$ whirl
```

Open your browser to [http://localhost:5000](http://localhost:5000) to access the Airflow UI. Manually enable the DAG and watch the pipeline run to successful completion.

This example includes a `.whirl.env` configuration file in the DAG directory. In the environment folder there is also a `.whirl.env` which specifies S3-specific variables. The example folder also contains a `whirl.setup.d/` directory which contains an initialization script (`01_add_connection_api_and_mockdata.sh`). This script is executed in the airflow containers after the environment-specific scripts have run and will:

- Add a connection to the API endpoint;
- Add an [expectation](http://www.mock-server.com/mock_server/creating_expectations.html) for the MockServer to know which response needs to be sent for which requested path;
- Install Pandas and PyArrow to support transforming the JSON into a Parquet file;
- Create a local directory where the intermediate file is stored before being uploaded to S3.

For the DAGs we wanted to test different scenarios

##### Simple single dataset dependency


29 changes: 29 additions & 0 deletions examples/airflow-datasets/dataset-A-producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datetime import timedelta, datetime
from airflow import DAG

from airflow.decorators import task

from include.datasets import DEMO_A_DS


default_args = {
'owner': 'whirl',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

with DAG(
dag_id='dataset-A-producer',
default_args=default_args,
schedule=None,
dagrun_timeout=timedelta(seconds=120)
):

@task(outlets=[DEMO_A_DS])
def trigger_dataset(**context):
print(f"Triggering dataset: {DEMO_A_DS.uri}")
pass

trigger_dataset()
31 changes: 31 additions & 0 deletions examples/airflow-datasets/dataset-ABC-consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import timedelta, datetime
from airflow import DAG

from airflow.decorators import task
from airflow.hooks.S3_hook import S3Hook

from include.datasets import DEMO_A_DS, DEMO_B_DS, DEMO_C_DS


default_args = {
'owner': 'whirl',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

with DAG(
dag_id='dataset-ABC-consumer',
default_args=default_args,
schedule=[DEMO_A_DS, DEMO_B_DS, DEMO_C_DS],
dagrun_timeout=timedelta(seconds=120)
):

@task
def echo_trigger(triggering_dataset_events=None):
for dataset, dataset_list in triggering_dataset_events.items():
print(dataset, dataset_list)
print(dataset_list[0].source_dag_run.dag_id)

echo_trigger()
31 changes: 31 additions & 0 deletions examples/airflow-datasets/dataset-ABCD-consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import timedelta, datetime
from airflow import DAG

from airflow.decorators import task
from airflow.hooks.S3_hook import S3Hook

from include.datasets import DEMO_A_DS, DEMO_B_DS, DEMO_C_DS, DEMO_D_DS


default_args = {
'owner': 'whirl',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

with DAG(
dag_id='dataset-ABCD-consumer',
default_args=default_args,
schedule=[DEMO_A_DS, DEMO_B_DS, DEMO_C_DS, DEMO_D_DS],
dagrun_timeout=timedelta(seconds=120)
):

@task
def echo_trigger(triggering_dataset_events=None):
for dataset, dataset_list in triggering_dataset_events.items():
print(dataset, dataset_list)
print(dataset_list[0].source_dag_run.dag_id)

echo_trigger()
29 changes: 29 additions & 0 deletions examples/airflow-datasets/dataset-B-producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datetime import timedelta, datetime
from airflow import DAG

from airflow.decorators import task

from include.datasets import DEMO_B_DS


default_args = {
'owner': 'whirl',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

with DAG(
dag_id='dataset-B-producer',
default_args=default_args,
schedule=None,
dagrun_timeout=timedelta(seconds=120)
):

@task(outlets=[DEMO_B_DS])
def trigger_dataset(**context):
print(f"Triggering dataset: {DEMO_B_DS.uri}")
pass

trigger_dataset()
29 changes: 29 additions & 0 deletions examples/airflow-datasets/dataset-C-producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datetime import timedelta, datetime
from airflow import DAG

from airflow.decorators import task

from include.datasets import DEMO_C_DS


default_args = {
'owner': 'whirl',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

with DAG(
dag_id='dataset-C-producer',
default_args=default_args,
schedule=None,
dagrun_timeout=timedelta(seconds=120)
):

@task(outlets=[DEMO_C_DS])
def trigger_dataset(**context):
print(f"Triggering dataset: {DEMO_C_DS.uri}")
pass

trigger_dataset()
29 changes: 29 additions & 0 deletions examples/airflow-datasets/dataset-D-producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datetime import timedelta, datetime
from airflow import DAG

from airflow.decorators import task

from include.datasets import DEMO_D_DS


default_args = {
'owner': 'whirl',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

with DAG(
dag_id='dataset-D-producer',
default_args=default_args,
schedule=None,
dagrun_timeout=timedelta(seconds=120)
):

@task(outlets=[DEMO_D_DS])
def trigger_dataset(**context):
print(f"Triggering dataset: {DEMO_D_DS.uri}")
pass

trigger_dataset()
29 changes: 29 additions & 0 deletions examples/airflow-datasets/dataset-E-producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datetime import timedelta, datetime
from airflow import DAG

from airflow.decorators import task

from include.datasets import DEMO_E_DS


default_args = {
'owner': 'whirl',
'depends_on_past': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

with DAG(
dag_id='dataset-E-producer',
default_args=default_args,
schedule=None,
dagrun_timeout=timedelta(seconds=120)
):

@task(outlets=[DEMO_E_DS])
def trigger_dataset(**context):
print(f"Triggering dataset: {DEMO_E_DS.uri}")
pass

trigger_dataset()
Loading

0 comments on commit 23fddbb

Please sign in to comment.