ETL pipline build on AirFlow
·
Report Bug
·
Request Feature
Table of Contents
The goal is building a ETL pipline on AirFlow to auto get data from Google BigQuery and storage in the local database MySQL after process.
Here's why:
- AirFlow scheduling system can start ETL pipeline at a fixed time
- Re-run program in fixed times when meeting operational errors
- Quickly check running results and notice errors on the AirFlow Dags panel
- Simply research running records through the log file system
- Obtain and store the required data after the ETL process
The whole architecture consists of two independent systems, one is AirFlow-ETL and the other is a Real-time Dashboard.
This system runs the ETL program regularly through the AirFlow system, uses the JSON key to obtain event tracking data from Google BigQuery, converts the data,
and stores it in the local database MySQL for access by another dashboard system.
Every 10 minutes, the transaction data of the day is requested from BigQuery, classifying the transaction results and error codes, and then updating data in MySQL.
Using JSON key accesses tables on BigQuery, and load daily data through SQL.
Since the database time zone on BigQuery is different from the local one, the UTC+8 time zone problem must be dealt with before using SQL to retrieve data.
# bigquery_eventtracking_regular_report.py
startTime = (datetime.now() + timedelta(days=-n-1)).strftime('%Y-%m-%d') + 'T16:00:00'
endTime = (datetime.now() + timedelta(days=-n)).strftime('%Y-%m-%d') + 'T16:00:00'
Store the obtained data as a DataFrame, and then add columns after procession.
# parse.py
df['event_date'] = eventTime
df['error_rate'] = df['sum_of_error'] / sum(df['sum_of_error'])
Insert/ Update transformed data on MySQL through SQL.
- If the row data already in the db -> update the value.
- Else -> Insert the row data.
# storage.py
logging.info('searching exist in db..' + error_code)
sql = f"""
SELECT * From {table}
WHERE event_date="{eventTime}" and error_code="{error_code}" and sport_code="{sport_code}";
"""
cur.execute(sql)
if(cur.fetchone()):
logging.info('updating data..')
sql = f"""
UPDATE {table} SET sum_of_error = {sum_of_error}
WHERE event_date = "{eventTime}" and error_code = "{error_code}" and sport_code="{sport_code}";
"""
else:
logging.info('inserting data..')
sql = f"""
INSERT INTO {table} (event_date, sport_code, error_code, sum_of_error)
VALUES ("{eventTime}", "{sport_code}", "{error_code}", {sum_of_error}) ;
"""
cur.execute(sql)
conn.commit()
-
Check whether the dag file runs every 10 mins
AirFlow Dag Panel
-
Evaluate the data is written in database correctly
Table : sport_transaction_error
Table : sport_transaction_result
Download the whole project except this README file and the pic folder, and move the project under the path: /airflow/dags/
- Install AirFlow and set up the Panel
- Replace BigQuery JSON key and table name to your own.
# _query.py credential_path = "/home/albert/airflow/dags/bigquery_eventtracking_regular_report_module/sg-prod-readonly-303206-cb8365379fd6.json"
- Install a local database MySQL & Create tables
- Create a virtual enviroment for airflow test tasks
source airflow_venv/bin/activate
- Clone the project into the enviroment and check if it's runnable
- Check whether the dag file is readable for the airflow dag list
airflow dags list
- Check whether the dag tasks is detectable for the task list
airflow tasks list <dag_id>
- Check whether the tasks in the dag are runnable
(time formate ex. yyyy-mm-dd)airflow tasks test <dag_id> <task_id> <start_time>
- Close the airflow virtul evniroment
deactivate
- Clone the project again into the real enviroment under /airflow/dags
- Check whether the dag show up on the AirFlow Dag Panel
The structure is workable for every ETL process,
and the AirFlow system helps to centrally manage all tasks and instantly detect errors in operation.
See the open issues for a list of proposed features (and known issues).
Yu-Chieh Wang - LinkedIn
email: angelxd84130@gmail.com