Skip to content

Commit

Permalink
support flink cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-zou committed Dec 5, 2023
1 parent de50a76 commit a2609d9
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 15 deletions.
22 changes: 21 additions & 1 deletion README-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,39 @@ The operation commands corresponding to the supported functions of redis are:
| zrem decrby srem | |
| del hdel | |

### Command for CDC
| CDC INSERT/UPDATE | CDC DELETE |
|---------------------------------------------------------|-----------------------------------------------|
| set | del |
| hset | hdel |
| rpush lpush | no response |
| incrBy incrByFloat decrBy hincrBy hincryByFloat zincrby | write relative value,如:incrby 2 -> incryby -2 |
| sadd zadd | srem zrem |
| pfadd(hyperloglog) | no response |
| publish | no response |
| zrem srem | no response |
| del hdel | no response |
Note: The cdc update operation has the same effect as the cdc insert.

### Instructions:

After executing mvn package -DskipTests on the command line, import the generated package flink-connector-redis-1.3.1.jar into flink lib, no other settings are required.


<br/>
The project depends on Lettuce(6.2.1) and netty-transport-native-epoll(4.1.82.Final),flink-connection-redis-1.3.1.jar if these packages are available.
Otherwise, use flink-connector-redis-1.3.1-jar-with-dependencies.jar.
<br/>

Development environment engineering direct reference:

```
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.3.1</version>
<classifier>jar-with-dependencies</classifier>
<!-- When the Lettuce netty-transport-native-epoll dependency is not imported separately -->
<!-- <classifier>jar-with-dependencies</classifier>-->
</dependency>
```

Expand Down
26 changes: 20 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@
基于[bahir-flink](https://github.com/apache/bahir-flink.git)二次开发,相对bahir调整的内容有:
```
1.使用Lettuce替换Jedis,同步读写改为异步读写,大幅度提升了性能
2.增加了Table/SQL API,增加维表查询支持
2.增加了Table/SQL API,增加维表join查询支持
3.增加查询缓存(支持增量与全量)
4.增加支持整行保存功能,用于多字段的维表关联查询
5.增加限流功能,用于Flink SQL在线调试功能
6.增加支持Flink高版本(包括1.12,1.13,1.14+)
7.统一过期策略等。
7.统一过期策略等
8.支持flink cdc删除及其它RowKind.DELETE
```

因bahir使用的flink接口版本较老,所以改动较大,开发过程中参考了腾讯云与阿里云两家产商的流计算产品,取两家之长,并增加了更丰富的功能。
### 支持功能对应redis的操作命令有:

| 插入 | 维表查询 |
|---------------------------------------------------------|------|
| set | get |
| hset | hget |
| 插入/CDC插入与更新 | 维表查询 | CDC 删除 |
|---------------------------------------------------------|------|--------------|
| set | get | set -> del |
| hset | hget | hset -> hdel |
| rpush lpush | |
| incrBy incrByFloat decrBy hincrBy hincryByFloat zincrby | |
| sadd zadd pfadd(hyperloglog) | |
Expand All @@ -27,6 +28,19 @@
| del hdel | |


### CDC时支持类型
| CDC插入及更新 | CDC删除时响应操作 |
|---------------------------------------------------------|--------------------------------|
| set | del |
| hset | hdel |
| rpush lpush | 不响应 |
| incrBy incrByFloat decrBy hincrBy hincryByFloat zincrby | 写入相对值,如:incrby 2 -> incryby -2 |
| sadd zadd | srem zrem |
| pfadd(hyperloglog) | 不响应 |
| publish | 不响应 |
| zrem srem | 不响应 |
| del hdel | 不响应 |
注:cdc更新操作与插入效果相同

### 使用方法:
1.打包命令: mvn package -DskipTests</br>
Expand Down
19 changes: 18 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,24 @@ under the License.
<version>4.1.82.Final</version>
</dependency>


<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.4.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ public void eval(CompletableFuture<Collection<GenericRowData>> resultFuture, Obj
* @param keys
* @throws Exception
*/
private void query(CompletableFuture<Collection<GenericRowData>> resultFuture, Object... keys)
throws Exception {
private void query(CompletableFuture<Collection<GenericRowData>> resultFuture, Object... keys) {
switch (redisCommand) {
case GET:
this.redisCommandsContainer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public RedisSinkFunction(
public void invoke(IN input, Context context) throws Exception {
RowData rowData = (RowData) input;
RowKind kind = rowData.getRowKind();
if (kind != RowKind.INSERT && kind != RowKind.UPDATE_AFTER) {
if (kind == RowKind.UPDATE_BEFORE) {
return;
}

Expand All @@ -123,7 +123,7 @@ public void invoke(IN input, Context context) throws Exception {
params[params.length - 1] = serializeWholeRow(rowData);
}

startSink(params);
startSink(params, kind);
}

/**
Expand All @@ -132,11 +132,20 @@ public void invoke(IN input, Context context) throws Exception {
* @param params
* @throws Exception
*/
private void startSink(String[] params) throws Exception {
private void startSink(String[] params, RowKind kind) throws Exception {
for (int i = 0; i <= maxRetryTimes; i++) {
try {
RedisFuture redisFuture = sink(params);
redisFuture.whenComplete((r, t) -> setTtl(params[0]));
RedisFuture redisFuture = null;
if (kind == RowKind.DELETE) {
redisFuture = rowKindDelete(params);
} else {
redisFuture = sink(params);
}

if (redisFuture != null) {
redisFuture.whenComplete((r, t) -> setTtl(params[0]));
}

break;
} catch (UnsupportedOperationException e) {
throw e;
Expand Down Expand Up @@ -253,6 +262,60 @@ private RedisFuture sink(String[] params) {
return redisFuture;
}

/**
* process redis command when RowKind == DELETE.
*
* @param params
*/
private RedisFuture rowKindDelete(String[] params) {
RedisFuture redisFuture = null;
switch (redisCommand) {
case SADD:
redisFuture = this.redisCommandsContainer.srem(params[0], params[1]);
break;
case SET:
redisFuture = this.redisCommandsContainer.del(params[0]);
break;
case ZADD:
redisFuture = this.redisCommandsContainer.zrem(params[0], params[2]);
break;
case ZINCRBY:
Double d = -Double.valueOf(params[1]);
redisFuture =
this.redisCommandsContainer.zincrBy(params[0], d.toString(), params[2]);
break;
case HSET:
{
redisFuture = this.redisCommandsContainer.hdel(params[0], params[1]);
}
break;
case HINCRBY:
redisFuture =
this.redisCommandsContainer.hincrBy(
params[0], params[1], -Long.valueOf(params[2]));
break;
case HINCRBYFLOAT:
redisFuture =
this.redisCommandsContainer.hincrByFloat(
params[0], params[1], -Double.valueOf(params[2]));
break;
case INCRBY:
redisFuture =
this.redisCommandsContainer.incrBy(params[0], -Long.valueOf(params[1]));
break;
case INCRBYFLOAT:
redisFuture =
this.redisCommandsContainer.incrByFloat(
params[0], -Double.valueOf(params[1]));
break;
case DECRBY:
redisFuture =
this.redisCommandsContainer.decrBy(params[0], -Long.valueOf(params[1]));
break;
}
return redisFuture;
}

/**
* set ttl for key.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.apache.flink.streaming.connectors.redis.table;

import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.TestRedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.jupiter.api.Test;

public class FlinkCDCTest extends TestRedisConfigBase {

@Test
public void testCdc() throws Exception {
String ddl =
"CREATE TABLE orders (\n"
+ " order_id INT,\n"
+ " customer_name STRING,\n"
+ " price DECIMAL(10, 5),\n"
+ " product_id INT,\n"
+ " PRIMARY KEY(order_id) NOT ENFORCED\n"
+ " ) WITH (\n"
+ " 'connector' = 'mysql-cdc',\n"
+ " 'hostname' = '10.11.69.176',\n"
+ " 'port' = '3306',\n"
+ " 'username' = 'test',\n"
+ " 'password' = '123456',\n"
+ " 'database-name' = 'cdc',\n"
+ " 'table-name' = 'orders');";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
tEnv.executeSql(ddl);

String sink =
"create table sink_redis(name varchar, level varchar, age varchar) with ( "
+ SQLWithUtil.sigleWith()
+ " '"
+ REDIS_COMMAND
+ "'='"
+ RedisCommand.HSET
+ "' )";
tEnv.executeSql(sink);
TableResult tableResult =
tEnv.executeSql(
"insert into sink_redis select cast(order_id as string), customer_name, cast(product_id as string) from orders /*+ OPTIONS('server-id'='5401-5404') */");
tableResult.getJobClient().get().getJobExecutionResult().get();
}
}

0 comments on commit a2609d9

Please sign in to comment.