This repository contains the starter code for the ksqlDB workshop.
This workshop explains how to integrate ksqlDB with an external data source (Elasticsearch) to build Driver Location Tracking & Monitoring app and use streaming SQL engine for Apache Kafka.
Use Docker Compose (docker-compose.yml) for setting up and running the infrastructure, which includes ksqlDB and Kafka Connect in embedded mode.
The following tools are available when you run the whole infrastructure:
- Apache Kafka
- Zookeeper
- ksqlDB
- Kafka Connect
- Schema Registry
- Kowl UI
- Kafka Connect UI
- Elasticsearch
- Kibana
- Docker
cd docker
docker compose up -d
docker compose ps
# NAME SERVICE STATUS PORTS
# elasticsearch elasticsearch running 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp, 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp
# kafka kafka running 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:9101->9101/tcp, :::9101->9101/tcp
# kafka-connect-ui kafka-connect-ui running 0.0.0.0:8000->8000/tcp, :::8000->8000/tcp
# kibana kibana running 0.0.0.0:5601->5601/tcp, :::5601->5601/tcp
# kowl kowl running 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp
# ksqldb-cli ksqldb-cli running
# ksqldb-server ksqldb-server running 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 0.0.0.0:8088->8088/tcp, :::8088->8088/tcp
# schema-registry schema-registry running 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp
# zookeeper zookeeper running 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 2888/tcp, 3888/tcp
docker compose exec ksqldb-cli bash ksql http://ksqldb-server:8088
CREATE TABLE driver_table (driver_id BIGINT PRIMARY KEY, name VARCHAR)
WITH (kafka_topic='driver.info.snapshots', value_format='AVRO', partitions=6);
CREATE STREAM driver_location_stream (driver_id BIGINT KEY, location STRUCT<lat DOUBLE, lon DOUBLE>)
WITH (kafka_topic='driver.location.events', value_format='AVRO', partitions=6);
CREATE STREAM enriched_driver_location_stream WITH (
kafka_topic='driver.location.enriched.events',
value_format='AVRO') AS
SELECT
driver.driver_id AS driver_id,
driver.name AS driver_name,
CAST(location->lat AS STRING) + ',' + CAST(location->lon AS STRING) AS location
FROM driver_location_stream driver_location JOIN driver_table driver
ON driver_location.driver_id = driver.driver_id
EMIT CHANGES;
show streams;
# Stream Name | Kafka Topic | Key Format | Value Format | Windowed
# -------------------------------------------------------------------------------------------------------------
# DRIVER_LOCATION_STREAM | driver.location.events | KAFKA | AVRO | false
# ENRICHED_DRIVER_LOCATION_STREAM | driver.location.enriched.events | KAFKA | AVRO | false
# KSQL_PROCESSING_LOG | ksql_connect_01ksql_processing_log | KAFKA | JSON | false
# -------------------------------------------------------------------------------------------------------------
show tables;
# Table Name | Kafka Topic | Key Format | Value Format | Windowed
# -------------------------------------------------------------------------------------
# DRIVER_TABLE | driver.info.snapshots | KAFKA | AVRO | false
# -------------------------------------------------------------------------------------
INSERT INTO driver_table (driver_id, name) VALUES (1, 'Driver 1');
INSERT INTO driver_table (driver_id, name) VALUES (2, 'Driver 2');
INSERT INTO driver_table (driver_id, name) VALUES (3, 'Driver 3');
INSERT INTO driver_table (driver_id, name) VALUES (4, 'Driver 4');
INSERT INTO driver_table (driver_id, name) VALUES (5, 'Driver 5');
INSERT INTO driver_table (driver_id, name) VALUES (6, 'Driver 6');
INSERT INTO driver_location_stream (driver_id, location) VALUES (1, STRUCT(lat := 50.055355899352136, lon := 19.935877982646776));
INSERT INTO driver_location_stream (driver_id, location) VALUES (2, STRUCT(lat := 50.061660189205384, lon := 19.92373795331377));
INSERT INTO driver_location_stream (driver_id, location) VALUES (3, STRUCT(lat := 50.06767789514652, lon := 19.913912678521136));
INSERT INTO driver_location_stream (driver_id, location) VALUES (4, STRUCT(lat := 49.984300646662135, lon := 20.053662685583486));
INSERT INTO driver_location_stream (driver_id, location) VALUES (5, STRUCT(lat := 50.05768283673512, lon := 19.85473522517211));
INSERT INTO driver_location_stream (driver_id, location) VALUES (6, STRUCT(lat := 50.06721095767255, lon := 19.87215179908983));
PRINT 'driver.info.snapshots' FROM BEGINNING;
SET 'auto.offset.reset' = 'earliest';
SELECT * from enriched_driver_location_stream EMIT CHANGES;
exit
# ksql> exit
# Exiting ksqlDB.
curl localhost:9200
# {
# "name" : "74c66e90824c",
# "cluster_name" : "docker-cluster",
# "cluster_uuid" : "CPyD4K3QT4KOqs-ut9HPCA",
# "version" : {
# "number" : "7.8.0",
# "build_flavor" : "default",
# "build_type" : "docker",
# "build_hash" : "757314695644ea9a1dc2fecd26d1a43856725e65",
# "build_date" : "2020-06-14T19:35:50.234439Z",
# "build_snapshot" : false,
# "lucene_version" : "8.5.1",
# "minimum_wire_compatibility_version" : "6.8.0",
# "minimum_index_compatibility_version" : "6.0.0-beta1"
# },
# "tagline" : "You Know, for Search"
# }
curl -X PUT "localhost:9200/driver.location.enriched.events/?pretty" -H 'Content-Type: application/json' -d'
{
"mappings": {
"dynamic_templates": [{
"locations": {
"mapping": {
"type": "geo_point"
},
"match": "*LOCATION"
}}]}}
'
# {
# "acknowledged" : true,
# "shards_acknowledged" : true,
# "index" : "driver.location.enriched.events"
# }
Open your web browser and go to Kibana Saved Objects page.
Import driver_live_location.ndjson file.
Start ksqlDB's interactive CLI
docker compose exec ksqldb-cli bash ksql http://ksqldb-server:8088
In the ksqlDB CLI, run the following command to create sink connector.
CREATE SOURCE CONNECTOR elasticsearch_sink_demo WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'topics' = 'driver.location.enriched.events',
'type.name' = '_doc',
'errors.log.enable' = 'true',
'schema.registry.url' = 'http://schema-registry:8081',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081'
);
When the sink connector is created, it exports any data matching the specified topics
.
DESCRIBE connector elasticsearch_sink_demo;
Open your web browser and go to Kibana Maps page then open Driver live-location
.
Update Driver 1
position, by running following command.
INSERT INTO driver_location_stream (driver_id, location) VALUES (1, STRUCT(lat := 50.078682625477754, lon := 19.7877404489687));
Use Driver live location
Layer -> Fit to Data to move to current location of driver in Kibana dashboard.
This workshop tutorial was just the beginning and only touched some basic concepts and useful commands that can be used during ksqlDB development.
Below few useful links that can help improve Your knowledge about ksqlDB.
- Introducing ksqlDB
- An introduction to ksqlDB
- ksqlDB HOWTO: Joins
- ksqlDB HOWTO: Handling Time
- ksqlDB HOWTO: Split and Merge Kafka Topics
- 4 Incredible ksqlDB Techniques (#2 Will Make You Cry)
When you're done, stop Docker containers by running.
docker compose down -v
Name | Endpoint |
---|---|
Ksql |
http://localhost:8083/ |
Ksql - Embedded Kafka Connect |
http://localhost:8088/ |
Kafka Connect UI |
http://localhost:8000/ |
Kowl UI |
http://localhost:8080/ |
Schema-registry |
http://localhost:8081/ |
Elasticsearch |
http://localhost:9200/ |
Kibana |
http://localhost:5601/ |
- ksqlDB
- ksqlDB Headless Server Settings
- Kafka Docker Images
- Elasticsearch Sink Connector
- Confluent Hub
- Kafka Connect UI
- cloudhut/kowl
- Elasticsearch
- Kibana
- How to Visualize Geo Data on a Map with Kibana - Version 7.10
Distributed under the MIT License. See LICENSE
for more information.