Sample Program using Pravega and Flink to analyze data(NY Taxi Records), inspired by Pravega Samples and @wuchong's excellent work flink-sql-training
本文假定您已具备基础的 SQL 知识。
- Pravega 0.10.1
- Flink 1.13.1
- Zeppelin 0.10.1
本sample 程序是基于 Docker 进行的,因此你只需要安装了 Docker 即可。不需要依赖 Java、Scala 环境、或是IDE。
注意: 使用 Docker 启动上述组件,请保证 Docker 内存大于 4G (推荐 6G)。
本sample所使用数据来自 NY Taxi Records. 可以从以下链接下载:
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-01.csv
https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
git clone https://github.com/fyang86/pravega-flink-sql-examples.git
docker-compose up -d
以启动docker-compose环境- 打开
localhost:18080
, 进入Zeppelin界面 - 点击右上角
Interpreter
进入Interpreter设置 - 搜索
flink
, 配置FLINK_HOME
为/opt/flink-1.13.1
并保存 - 回到Zeppelin首页, Import the note
flink_sql_demo_notebook
- 执行
%flink.ssql show tables;
, 查看 Flink UI:localhost:18081
是否工作 - 进入
datagen
目录并执行mvn install
,在Flink UI中提交之前所生成的jar包文件datagen-0.1-jar-with-dependencies.jar
, 将数据注入Pravega
查询里程大于60mile的行程:
CREATE TABLE TaxiRide1 (
rideId INT,
vendorId INT,
pickupTime TIMESTAMP(3),
dropOffTime TIMESTAMP(3),
passengerCount INT,
tripDistance FLOAT,
startLocationId INT,
destLocationId INT,
startLocationBorough STRING,
startLocationZone STRING,
startLocationServiceZone STRING,
destLocationBorough STRING,
destLocationZone STRING,
destLocationServiceZone STRING
) with (
'connector' = 'pravega',
'controller-uri' = 'tcp://pravega:9090',
'scope' = 'taxi',
'scan.execution.type' = 'streaming',
'scan.reader-group.name' = 'long-distance',
'scan.streams' = 'trip',
'format' = 'json'
);
SELECT * FROM TaxiRide1 WHERE tripDistance > 60;
查询每种乘客数量的行车事件数:
CREATE TABLE TaxiRide2 (
rideId INT,
vendorId INT,
pickupTime TIMESTAMP(3),
dropOffTime TIMESTAMP(3),
passengerCount INT,
tripDistance FLOAT,
startLocationId INT,
destLocationId INT,
startLocationBorough STRING,
startLocationZone STRING,
startLocationServiceZone STRING,
destLocationBorough STRING,
destLocationZone STRING,
destLocationServiceZone STRING
) with (
'connector' = 'pravega',
'controller-uri' = 'tcp://pravega:9090',
'scope' = 'taxi',
'scan.execution.type' = 'streaming',
'scan.reader-group.name' = 'passenger-count',
'scan.streams' = 'trip',
'format' = 'json'
);
SELECT passengerCount, COUNT(*) AS cnt
FROM TaxiRide2
GROUP BY passengerCount;
查询指定窗口时间内前往每个目的地的乘客数:
CREATE TABLE TaxiRide3 (
rideId INT,
vendorId INT,
pickupTime TIMESTAMP(3),
dropOffTime TIMESTAMP(3),
passengerCount INT,
tripDistance FLOAT,
startLocationId INT,
destLocationId INT,
startLocationBorough STRING,
startLocationZone STRING,
startLocationServiceZone STRING,
destLocationBorough STRING,
destLocationZone STRING,
destLocationServiceZone STRING,
WATERMARK FOR dropOffTime AS dropOffTime - INTERVAL '30' SECONDS
) with (
'connector' = 'pravega',
'controller-uri' = 'tcp://pravega:9090',
'scope' = 'taxi',
'scan.execution.type' = 'streaming',
'scan.reader-group.name' = 'max-traveller',
'scan.streams' = 'trip',
'format' = 'json'
);
SELECT
destLocationZone,
TUMBLE_START (dropOffTime, INTERVAL '1' HOUR) as window_start,
TUMBLE_END (dropOffTime, INTERVAL '1' HOUR) as window_end,
count(passengerCount) as cnt
FROM
(SELECT passengerCount, dropOffTime, destLocationZone FROM TaxiRide3)
GROUP BY destLocationZone, TUMBLE (dropOffTime, INTERVAL '1' HOUR);
查询指定窗口时间内最受欢迎的出租车供应商:
CREATE TABLE TaxiRide4 (
rideId INT,
vendorId INT,
pickupTime TIMESTAMP(3),
dropOffTime TIMESTAMP(3),
passengerCount INT,
tripDistance FLOAT,
startLocationId INT,
destLocationId INT,
startLocationBorough STRING,
startLocationZone STRING,
startLocationServiceZone STRING,
destLocationBorough STRING,
destLocationZone STRING,
destLocationServiceZone STRING,
WATERMARK FOR pickupTime AS pickupTime - INTERVAL '30' SECONDS
) with (
'connector' = 'pravega',
'controller-uri' = 'tcp://pravega:9090',
'scope' = 'taxi',
'scan.execution.type' = 'streaming',
'scan.reader-group.name' = 'popular-vendor',
'scan.streams' = 'trip',
'format' = 'json'
);
SELECT
vendorId,
HOP_START (pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) as window_start,
HOP_END (pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) as window_end,
count(vendorId) as cnt
FROM
(SELECT vendorId, pickupTime FROM TaxiRide4)
GROUP BY vendorId, HOP (pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE);
将查询指定窗口时间内最热门的目的地结果写入Pravega:
CREATE TABLE TaxiRide5 (
rideId INT,
vendorId INT,
pickupTime TIMESTAMP(3),
dropOffTime TIMESTAMP(3),
passengerCount INT,
tripDistance FLOAT,
startLocationId INT,
destLocationId INT,
startLocationBorough STRING,
startLocationZone STRING,
startLocationServiceZone STRING,
destLocationBorough STRING,
destLocationZone STRING,
destLocationServiceZone STRING,
WATERMARK FOR pickupTime AS pickupTime - INTERVAL '30' SECONDS
) with (
'connector' = 'pravega',
'controller-uri' = 'tcp://pravega:9090',
'scope' = 'taxi',
'scan.execution.type' = 'streaming',
'scan.reader-group.name' = 'popular-dest',
'scan.streams' = 'trip',
'format' = 'json'
);
CREATE TABLE PopularDest (
destLocationId INT,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
cnt INT
) with (
'connector' = 'pravega',
'controller-uri' = 'tcp://pravega:9090',
'scope' = 'taxi',
'sink.stream' = 'popDest',
'format' = 'json'
);
INSERT INTO PopularDest
SELECT
destLocationId, window_start, window_end, cnt
FROM
(SELECT
destLocationId,
HOP_START(pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS window_start,
HOP_END(pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS window_end,
COUNT(destLocationId) AS cnt
FROM
(SELECT pickupTime, destLocationId FROM TaxiRide5)
GROUP BY destLocationId, HOP(pickupTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE))
WHERE cnt > 8;