From 0bcba63fab352f7fc68f464c78bb2a585e7a3b55 Mon Sep 17 00:00:00 2001 From: zou Date: Thu, 2 Mar 2023 14:22:30 +0800 Subject: [PATCH] add support for expire key on time, the type of time is LocalTime, eg: 10:00 12:12:01 --- .../redis/common/config/RedisOptions.java | 6 ++ .../container/RedisClusterContainer.java | 17 ++++ .../container/RedisCommandsContainer.java | 8 ++ .../common/container/RedisContainer.java | 17 ++++ .../mapper/RedisCommandDescription.java | 11 ++- .../mapper/row/sink/RowRedisSinkMapper.java | 10 ++- .../redis/table/RedisDynamicTableFactory.java | 1 + .../redis/table/RedisLookupFunction.java | 5 +- .../redis/table/RedisSinkFunction.java | 35 ++++++-- .../connectors/redis/table/SQLTest.java | 80 +++++++++++++++++++ 10 files changed, 178 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/RedisOptions.java b/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/RedisOptions.java index 5516df7..7b8d2b2 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/RedisOptions.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/RedisOptions.java @@ -164,4 +164,10 @@ private RedisOptions() {} .enumType(RedisValueDataStructure.class) .defaultValue(RedisValueDataStructure.column) .withDescription("Optional redis value data structure."); + + public static final ConfigOption EXPIRE_ON_TIME = + ConfigOptions.key("expire.on.time") + .stringType() + .noDefaultValue() + .withDescription("Optional redis key expire on time, eg: 10:00 12:12:01"); } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java index 446a6ef..838d4bd 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java @@ -411,4 +411,21 @@ public RedisFuture> hgetAll(String key) { public RedisClusterAsyncCommands getAsyncCommands() { return clusterAsyncCommands; } + + @Override + public RedisFuture getTTL(String key) { + RedisFuture result = null; + try { + result = redisFuture = clusterAsyncCommands.ttl(key); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error( + "Cannot send Redis message with command ttl to key {} error message {}", + key, + e.getMessage()); + } + throw e; + } + return result; + } } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java index fd95331..63aa39c 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java @@ -191,6 +191,14 @@ public interface RedisCommandsContainer extends Serializable { */ void expire(String key, int seconds); + /** + * get ttl of key. + * + * @param key + * @return + */ + RedisFuture getTTL(String key); + /** * delete key in map. * diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java index 1d79d8e..dc32f4a 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java @@ -427,4 +427,21 @@ public void srem(String setName, String value) { public RedisClusterAsyncCommands getAsyncCommands() { return asyncCommands; } + + @Override + public RedisFuture getTTL(String key) { + RedisFuture result = null; + try { + result = redisFuture = asyncCommands.ttl(key); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error( + "Cannot send Redis message with command ttl to key {} error message {}", + key, + e.getMessage()); + } + throw e; + } + return result; + } } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java index d0fa2c1..57c6b94 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java @@ -1,6 +1,7 @@ package org.apache.flink.streaming.connectors.redis.common.mapper; import java.io.Serializable; +import java.time.LocalTime; /** */ public class RedisCommandDescription extends RedisCommandBaseDescription implements Serializable { @@ -9,13 +10,19 @@ public class RedisCommandDescription extends RedisCommandBaseDescription impleme private Integer ttl; - public RedisCommandDescription(RedisCommand redisCommand, Integer ttl) { - super(redisCommand); + private LocalTime expireTime; + public RedisCommandDescription(RedisCommand redisCommand, Integer ttl, LocalTime expireTime) { + super(redisCommand); + this.expireTime = expireTime; this.ttl = ttl; } public Integer getTTL() { return ttl; } + + public LocalTime getExpireTime() { + return expireTime; + } } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/sink/RowRedisSinkMapper.java b/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/sink/RowRedisSinkMapper.java index 388b62c..91eb15b 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/sink/RowRedisSinkMapper.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/sink/RowRedisSinkMapper.java @@ -10,7 +10,9 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.StringUtils; +import java.time.LocalTime; import java.util.HashMap; import java.util.Map; @@ -22,6 +24,8 @@ public abstract class RowRedisSinkMapper private Integer ttl; + private LocalTime expireTime; + private RedisCommand redisCommand; public RowRedisSinkMapper(int ttl, RedisCommand redisCommand) { @@ -45,11 +49,15 @@ public RowRedisSinkMapper(RedisCommand redisCommand, Map config) public RowRedisSinkMapper(RedisCommand redisCommand, ReadableConfig config) { this.redisCommand = redisCommand; this.ttl = config.get(RedisOptions.TTL); + String expireOnTime = config.get(RedisOptions.EXPIRE_ON_TIME); + if (!StringUtils.isNullOrWhitespaceOnly(expireOnTime)) { + this.expireTime = LocalTime.parse(expireOnTime); + } } @Override public RedisCommandDescription getCommandDescription() { - return new RedisCommandDescription(redisCommand, ttl); + return new RedisCommandDescription(redisCommand, ttl, expireTime); } @Override diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisDynamicTableFactory.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisDynamicTableFactory.java index 7965bb2..0c4a71a 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisDynamicTableFactory.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisDynamicTableFactory.java @@ -105,6 +105,7 @@ public Set> optionalOptions() { options.add(RedisOptions.VALUE_DATA_STRUCTURE); options.add(RedisOptions.REDIS_MASTER_NAME); options.add(RedisOptions.SENTINELS_INFO); + options.add(RedisOptions.EXPIRE_ON_TIME); return options; } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisLookupFunction.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisLookupFunction.java index caa4229..f3f8eb6 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisLookupFunction.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisLookupFunction.java @@ -56,9 +56,6 @@ public RedisLookupFunction( Preconditions.checkNotNull( flinkConfigBase, "Redis connection pool config should not be null"); Preconditions.checkNotNull(redisMapper, "Redis Mapper can not be null"); - Preconditions.checkNotNull( - redisMapper.getCommandDescription(), - "Redis Mapper data type description can not be null"); this.flinkConfigBase = flinkConfigBase; this.cacheTtl = redisLookupOptions.getCacheTtl(); @@ -77,6 +74,8 @@ public RedisLookupFunction( } RedisCommandBaseDescription redisCommandDescription = redisMapper.getCommandDescription(); + Preconditions.checkNotNull( + redisCommandDescription, "Redis Mapper data type description can not be null"); this.redisCommand = redisCommandDescription.getRedisCommand(); Preconditions.checkArgument( redisCommand == RedisCommand.HGET || redisCommand == RedisCommand.GET, diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java index 8ff4af6..e8cc2de 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.LocalTime; import java.util.List; import java.util.Objects; @@ -34,6 +35,7 @@ public class RedisSinkFunction extends RichSinkFunction { private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class); protected Integer ttl; + protected int expireTimeSeconds = -1; private RedisSinkMapper redisSinkMapper; private RedisCommand redisCommand; @@ -60,18 +62,21 @@ public RedisSinkFunction( ResolvedSchema resolvedSchema) { Objects.requireNonNull(flinkConfigBase, "Redis connection pool config should not be null"); Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null"); - Objects.requireNonNull( - redisSinkMapper.getCommandDescription(), - "Redis Mapper data type description can not be null"); this.flinkConfigBase = flinkConfigBase; this.maxRetryTimes = redisSinkOptions.getMaxRetryTimes(); this.redisSinkMapper = redisSinkMapper; RedisCommandDescription redisCommandDescription = (RedisCommandDescription) redisSinkMapper.getCommandDescription(); + Preconditions.checkNotNull( + redisCommandDescription, "Redis Mapper data type description can not be null"); this.redisCommand = redisCommandDescription.getRedisCommand(); this.ttl = redisCommandDescription.getTTL(); + if (redisCommandDescription.getExpireTime() != null) { + this.expireTimeSeconds = redisCommandDescription.getExpireTime().toSecondOfDay(); + } + this.columnDataTypes = resolvedSchema.getColumnDataTypes(); this.redisValueDataStructure = redisSinkOptions.getRedisValueDataStructure(); if (redisValueDataStructure == RedisValueDataStructure.row) { @@ -123,9 +128,7 @@ private void startSink(String[] params) throws Exception { for (int i = 0; i <= maxRetryTimes; i++) { try { sink(params); - if (ttl != null) { - this.redisCommandsContainer.expire(params[0], ttl); - } + setTtl(params[0]); break; } catch (UnsupportedOperationException e) { throw e; @@ -207,6 +210,26 @@ private void sink(String[] params) { } } + /** + * set ttl for key. + * + * @param key + */ + private void setTtl(String key) throws Exception { + if (expireTimeSeconds != -1) { + if (this.redisCommandsContainer.getTTL(key).get() == -1) { + int now = LocalTime.now().toSecondOfDay(); + this.redisCommandsContainer.expire( + key, + expireTimeSeconds > now + ? expireTimeSeconds - now + : 86400 + expireTimeSeconds - now); + } + } else if (ttl != null) { + this.redisCommandsContainer.expire(key, ttl); + } + } + /** * serialize whole row. * diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java index 02edf26..c045cde 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java @@ -10,6 +10,8 @@ import org.junit.jupiter.api.Test; import org.junit.platform.commons.util.Preconditions; +import java.time.LocalTime; + import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND; /** Created by jeff.zou on 2020/9/10. */ @@ -487,4 +489,82 @@ public void testMultiFieldLeftJoinForMap() throws Exception { Preconditions.condition(singleRedisCommands.hget("11_11", "11").equals("10.3"), ""); Preconditions.condition(singleRedisCommands.hget("12_12", "12") == "", ""); } + + @Test + public void testHgetWithUpdate() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + EnvironmentSettings environmentSettings = + EnvironmentSettings.newInstance().inStreamingMode().build(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); + + clusterCommands.hset("key1", "field1", "1"); + String dim = + "create table dim_table(name varchar, level varchar, age varchar) with ( 'connector'='redis', " + + "'cluster-nodes'='" + + CLUSTERNODES + + "','redis-mode'='cluster', 'password'='" + + CLUSTER_PASSWORD + + "','" + + REDIS_COMMAND + + "'='" + + RedisCommand.HGET + + "' )"; + + String source = + "create table source_table(age varchar, proctime as procTime()) " + + "with ('connector'='datagen', 'rows-per-second'='1', " + + "'fields.age.kind'='sequence', 'fields.age.start'='1', 'fields.age.end'='99'" + + ")"; + + String sink = + "create table sink_table(username varchar, level varchar,age varchar) with ( 'connector'='print')"; + + tEnv.executeSql(source); + tEnv.executeSql(dim); + tEnv.executeSql(sink); + + // tEnv.executeSql("insert into dim_table select 'key1', 'field1', age from + // source_table "); + String sql = + " insert into sink_table " + + " select d.name, d.level, d.age from source_table s" + + " left join dim_table for system_time as of s.proctime as d " + + " on d.name ='key1' and d.level ='field1'"; + TableResult tableResult = tEnv.executeSql(sql); + tableResult.getJobClient().get().getJobExecutionResult().get(); + System.out.println(sql); + } + + @Test + public void testSinkValueWithExpire() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + LocalTime localTime = LocalTime.now(); + int wait = 100; + localTime = localTime.plusSeconds(wait); + String dim = + "create table sink_redis(name varchar, level varchar, age varchar) with ( 'connector'='redis', " + + "'cluster-nodes'='" + + CLUSTERNODES + + "','redis-mode'='cluster', 'ttl'='10','expire.on.time'='" + + localTime.toString() + + "', 'password'='" + + CLUSTER_PASSWORD + + "','" + + REDIS_COMMAND + + "'='" + + RedisCommand.HSET + + "' )"; + + tEnv.executeSql(dim); + String sql = " insert into sink_redis select * from (values ('1', '11.3', '10.3'))"; + TableResult tableResult = tEnv.executeSql(sql); + tableResult.getJobClient().get().getJobExecutionResult().get(); + System.out.println(sql); + Preconditions.condition(clusterCommands.exists("1") == 1, ""); + Thread.sleep(wait * 1000); + Preconditions.condition(clusterCommands.exists("1") == 0, ""); + } }