!pip install -q apache-airflow
Airflow uses a relational database as a backend to store configuration data. By default, this is a SQLite database, which will be stored in ~/airflow/airflow.db. You initialize the database in your environment by running the following command in the terminal:
!airflow db init
Next, you need to create a user that can log into the Airflow UI. Enter the following in your terminal to create a user named admin with admin permissions:
!airflow users create \
--username admin \
--firstname Firstname \
--lastname Lastname \
--role Admin \
--email admin@example.org \
--password password
To confirm that the configuration works correctly, you can run the Airflow web server and scheduler and log into the UI. Execute the following commands in the terminal to start the web server and scheduler:
!airflow webserver --port 8080 -D
!nohup airflow scheduler &
!cat nohup.out
# !airflow config list
# %env AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.basic_auth
# %env AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.default
# !airflow config get-value api auth_backend
# %env AIRFLOW__API__ENABLE_EXPERIMENTAL_API=True
# %env AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.default
# %env AIRFLOW__WEBSERVER__RBAC=False
# %env AIRFLOW__WEBSERVER__AUTHENTICATE=False
!mkdir -p airflow_demo/dags
You will now add some Python code to create a minimal definition for a DAG called airflow_demo with two basic Bash tasks:
- t0: Runs echo to print some output.
- t1: Runs sleep for 5 seconds, then prints the date.
%%writefile ./airflow_demo/dags/dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'catchup': False,
'start_date': datetime(2021, 1, 1)
}
with DAG(
dag_id='airflow_demo_dag',
description='An example Airflow DAG',
schedule_interval=None,
default_args=default_args
) as dag:
t0 = BashOperator(
task_id='bash_task_0',
bash_command='echo "Hi there, this is the first Airflow task!"'
)
t1 = BashOperator(
task_id='bash_task_1',
bash_command='echo "Sleeping..." && sleep 5s && date'
)
t0 >> t1
!python ./airflow_demo/dags/dag.py
This will check for Python syntax errors in the file. If successful, there will be no output. Once you have confirmed that the file is error-free, proceed to the next step.
In order for the Airflow scheduler to find the new DAG file, you need to modify the configuration file.
%env AIRFLOW__CORE__DAGS_FOLDER=/content/airflow_demo/dags
!airflow config get-value core dags_folder
!airflow dags list
Finally, in order to run the new DAG, you will need to reinitialize the Airflow database in order for it to pick up the new DAG. Execute the following command in the terminal:
!airflow db init
In general use, you can skip this step when the scheduler is running and picks up the DAG at a pre-defined interval, but in this example, we manually reinitialize the database.
Then trigger the DAG run:
!airflow dags trigger airflow_demo_dag
This will run your new DAG and will show the output of the echo statement and the date output in the terminal.