Skip to content

Commit

Permalink
support set ttl on first sink
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-zou committed Dec 4, 2023
1 parent 816b655 commit de50a76
Show file tree
Hide file tree
Showing 13 changed files with 384 additions and 263 deletions.
16 changes: 12 additions & 4 deletions README-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ The operation commands corresponding to the supported functions of redis are:

### Instructions:

After executing mvn package -DskipTests on the command line, import the generated package flink-connector-redis-1.3.0.jar into flink lib, no other settings are required.
After executing mvn package -DskipTests on the command line, import the generated package flink-connector-redis-1.3.1.jar into flink lib, no other settings are required.

Development environment engineering direct reference:

```
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.3.0</version>
<version>1.3.1</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
```
Expand Down Expand Up @@ -76,7 +76,6 @@ key: name, field:subject, value: name\01subject\01score.
| port | 6379 | Integer | Redis port |
| password | null | String | null if not set |
| database | 0 | Integer | db0 is used by default |
| ttl | (none) | Integer | key expiration time as sink(seconds) |
| timeout | 2000 | Integer | Connection timeout, in ms, default 1s |
| cluster-nodes | (none) | String | Cluster ip and port, not empty when redis-mode is cluster, such as:10.11.80.147:7000,10.11.80.147:7001,10.11.80.147:8000 |
| command | (none) | String | Corresponds to the redis command above |
Expand All @@ -87,9 +86,18 @@ key: name, field:subject, value: name\01subject\01score.
| lookup.cache.load-all | false | Boolean | when command is hget, query all elements from redis map to cache,help to resolve cache penetration issues |
| sink.max-retries | 1 | Integer | Number of retries for write failures |
| value.data.structure | column | String | column: The value will come from a field (for example, set: key is the first field defined by DDL, and value is the second field)<br/> row: value is taken from the entire row, separated by '\01' |
| expire.on.time | (none) | String | Specify the time at which the key expires. The format is LocalTime, eg: 10:00 12:12:01. The ttl field will invalid |
| set.if.absent | false | Boolean | set/hset only when the key absent |


##### sink with ttl parameters

| Field | Default | Type | Description |
|--------------------|---------|---------|------------------------------------------------------------------------------------------------------|
| ttl | (none) | Integer | key expiration time (seconds), each time sink will set the ttl |
| ttl.on.time | (none) | String | The expiration time of the key in LocalTime.toString(), eg: 10:00 12:12:01, if ttl is not configured |
| ttl.key.not.absent | false | boolean | Used with ttl, which is set when the key doesn't exist |


##### Additional sink parameters when u debugging sql online which need to limit the resource usage:

| Field | Default | Type | Description |
Expand Down
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
2.将生成的包放入flink lib中即可,无需其它设置。

<br/>
项目依赖Lettuce(6.2.1)及netty-transport-native-epoll(4.1.82.Final),如flink环境有这两个包,则使用flink-connector-redis-1.3.0.jar,
否则使用flink-connector-redis-1.3.0-jar-with-dependencies.jar。
项目依赖Lettuce(6.2.1)及netty-transport-native-epoll(4.1.82.Final),如flink环境有这两个包,则使用flink-connector-redis-1.3.1.jar,
否则使用flink-connector-redis-1.3.1-jar-with-dependencies.jar。
<br/>

开发环境工程直接引用:
Expand All @@ -45,7 +45,7 @@
<artifactId>flink-connector-redis</artifactId>
<!-- 没有单独引入项目依赖Lettuce netty-transport-native-epoll依赖时 -->
<!-- <classifier>jar-with-dependencies</classifier>-->
<version>1.3.0</version>
<version>1.3.1</version>
</dependency>
```

Expand Down Expand Up @@ -83,7 +83,6 @@ create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR) with ('co
| port | 6379 | Integer | Redis 端口 |
| password | null | String | 如果没有设置,则为 null |
| database | 0 | Integer | 默认使用 db0 |
| ttl | (none) | Integer | sink时key过期时间(秒) |
| timeout | 2000 | Integer | 连接超时时间,单位 ms,默认 1s |
| cluster-nodes | (none) | String | 集群ip与端口,当redis-mode为cluster时不为空,如:10.11.80.147:7000,10.11.80.147:7001,10.11.80.147:8000 |
| command | (none) | String | 对应上文中的redis命令 |
Expand All @@ -94,9 +93,17 @@ create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR) with ('co
| lookup.cache.load-all | false | Boolean | 开启全量缓存,当命令为hget时,将从redis map查询出所有元素并保存到cache中,用于解决缓存穿透问题 |
| sink.max-retries | 1 | Integer | 写入失败重试次数 |
| value.data.structure | column | String | column: value值来自某一字段 (如, set: key值取自DDL定义的第一个字段, value值取自第二个字段)<br/> row: 将整行内容保存至value并以'\01'分割 |
| expire.on.time | (none) | String | 指定key的过期时间点,格式为LocalTime, eg: 10:00 12:12:01,ttl字段将无效 |
| set.if.absent | false | Boolean | 在key不存在时才写入,只对set hset有效 |

##### sink时ttl相关参数

| Field | Default | Type | Description |
|--------------------|---------|---------|-------------------------------------------------------------------|
| ttl | (none) | Integer | key过期时间(秒),每次sink时会设置ttl |
| ttl.on.time | (none) | String | key的过期时间点,格式为LocalTime.toString(), eg: 10:00 12:12:01,当ttl未配置时才生效 |
| ttl.key.not.absent | false | boolean | 与ttl一起使用,当key不存在时才设置ttl |


##### 在线调试SQL时,用于限制sink资源使用的参数:

| Field | Default | Type | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private RedisOptions() {}
.withDescription("Optional redis value data structure.");

public static final ConfigOption<String> EXPIRE_ON_TIME =
ConfigOptions.key("expire.on.time")
ConfigOptions.key("ttl.on.time")
.stringType()
.noDefaultValue()
.withDescription("Optional redis key expire on time, eg: 10:00 12:12:01");
Expand All @@ -176,4 +176,10 @@ private RedisOptions() {}
.booleanType()
.defaultValue(false)
.withDescription("Optional setIfAbsent for insert(set/hset) to redis");

public static final ConfigOption<Boolean> TTL_KEY_NOT_ABSENT =
ConfigOptions.key("ttl.key.not.absent")
.booleanType()
.defaultValue(false)
.withDescription("Optional set ttl when key not absent");
}
Loading

0 comments on commit de50a76

Please sign in to comment.