Airflow, Kafka and Postgres (target db) services spawned using docker compose. Airflow orchestrates the data pipeline, including spawning of PDI containers (via DockerOperator). The producer.ktr file uses the built-in pentaho Kafka Producer design component to publish messages to Kafka container. Whereas, the consumer.ktr file runs a python script via a CPython Script Executor plugin. Reasons for not using built-in PDI Kafka consumer:
- Unable to stop the built-in PDI consumer in case of no messages. As a result the Airflow DAG stays in running state forever.
- Needed a way to assign specific topic parition to the consumer.ktr to work on.
- Commit offsets only after they are successfully processed and inserted into database.
- Added flexibility of Python for future customizations to the consumer.
Process flow:
- TWO pdi transformation .ktr files have been used. One serving as the producer and the other as consumer.
- The producer file gets the input data, builds the key-message pair and sends to Kafka topic.
- The consumer transformation reads each message offset from the Kafka topic, processes them before loading into PostgresDb.
- Both the producer and consumer transformations triggerred from Airflow via DockerOperator.
- Source code files like KTRs and DAGs are rather mounted from host to docker. This removes the requirement of re-building the docker images in case of a code change.
- Docker Engine version: 20.10.6+
- Docker Compose version 1.29.1+
All the required steps for this demo have been set in the below .sh file. If required update the SETUP PARAMETERS section of the shell script. Navigate to the git folder and run the below 2 commands.
bash setup.sh
- Creates a .env file and sets all the environment variables required by the services in docker compose.
- Builds the required docker images.
NOTE: This shell script also creates jdbc.properties file for PDI containers. User needs to add all necessary DB connection strings if required. It is recommended to add this file to .gitignore.
docker-compose up -d
- Start all services:
docker-compose up
- Stop all services and remove containers:
docker-compose down
- List all the running containers:
docker ps
(add-a
to include inactive/dandling containers as well) - Get insided a running container:
docker exec -it <container-name> bash
- Get system level docker objects list:
docker system df
- For more specific commands, please check Reference section.
- Container logs:
docker logs [CONTAINER NAME]
, e.gdocker logs airflow-webserver
. Add-f
to follow log. - In order to increase no. of Airflow workers:
docker-compose up --scale airflow-worker=3
, if3
workers required. Note: more the number of workers more will be the pressure on system resources.
- Airflow logs: click task on UI > Logs
- Kafdrop: topic messages, their offsets and consumer lags
- Docker: using the above Docker commands, one can monitor the containers. You can add 3rd party container monitoring services to docker compose file.
- Docker commands:
- Apache Kafka:
- CPython Script Executor plugin