CDC streams to consume, analyse and enrich data from (point) a to (point) b to (point) z.
It is based on spring-cloud-stream integration framework to provide an easy-to-use, consistent developer experience to capture, analyze and enrich data processing pipelines acros different systems.
Capture the flight schedule change event in real-time from the source system through CDC. Send it to a processing pipeline that applies specific business rules based on the arrival and departure time delay to send notifications to the appropriate internal servicing units such as ground-ops, crew-ops, etc. Similarly, we can extend it to other source events to build the complete flight-ops real-time notification data processing pipeline using CDC.
We use the following services to build the data streaming pipelines:
- YugabyteDB YSQL is the source database that generates the flight schedule
- flightschedule stream supplier captures the flight schedule change events using debezium source, and streams them through a Kafka binder topic (flight_ops)
- flightops gets the schedule change events from the flight_ops topic, runs the rule processing, populates the notification event, and streams them through a Kafka binder topic (flight_watch)
- flightwatch gets the notification events from the flight_watch topic and persists them in the target database
- YugabyteDB YCQL is the target database to keep the notification events.
To have a better getting started experience, we have added the Gitpod support to try this out yourself with a single click. You don't need to make any of the infrastructure components. All you need is to fork the source repo and launch the Gitpod terminal using the browser plugin.
Run the following command from 1aa-ysql-shell
:
insert into flight_schedule(flight_no, scheduled_date, origin, destination, sta, eta, ata, std, etd, atd, scheduled_bay_Id) values('YB529', current_date, 'SIN', 'IND', (now()+interval '320 minutes')::timestamp, (now()+interval '320 minutes')::timestamp, (now()+interval '320 minutes')::timestamp, (now()+interval '10 minutes')::timestamp, (now()+interval '30 minutes')::timestamp, (now()+interval '30 minutes')::timestamp, 'T4');
Run the following command from 1ab-ycql-shell
to verify the changes:
SELECT * FROM flight_watch;
Experience it from the cdc-stream
shell