Skip to content

Commit

Permalink
add support for expire key on time, the type of time is LocalTime, eg…
Browse files Browse the repository at this point in the history
…: 10:00 12:12:01
  • Loading branch information
jszouxue committed Mar 2, 2023
1 parent 69688a2 commit 0bcba63
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,10 @@ private RedisOptions() {}
.enumType(RedisValueDataStructure.class)
.defaultValue(RedisValueDataStructure.column)
.withDescription("Optional redis value data structure.");

public static final ConfigOption<String> EXPIRE_ON_TIME =
ConfigOptions.key("expire.on.time")
.stringType()
.noDefaultValue()
.withDescription("Optional redis key expire on time, eg: 10:00 12:12:01");
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,4 +411,21 @@ public RedisFuture<Map<String, String>> hgetAll(String key) {
public RedisClusterAsyncCommands getAsyncCommands() {
return clusterAsyncCommands;
}

@Override
public RedisFuture<Long> getTTL(String key) {
RedisFuture<Long> result = null;
try {
result = redisFuture = clusterAsyncCommands.ttl(key);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error(
"Cannot send Redis message with command ttl to key {} error message {}",
key,
e.getMessage());
}
throw e;
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ public interface RedisCommandsContainer extends Serializable {
*/
void expire(String key, int seconds);

/**
* get ttl of key.
*
* @param key
* @return
*/
RedisFuture<Long> getTTL(String key);

/**
* delete key in map.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,4 +427,21 @@ public void srem(String setName, String value) {
public RedisClusterAsyncCommands getAsyncCommands() {
return asyncCommands;
}

@Override
public RedisFuture<Long> getTTL(String key) {
RedisFuture<Long> result = null;
try {
result = redisFuture = asyncCommands.ttl(key);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error(
"Cannot send Redis message with command ttl to key {} error message {}",
key,
e.getMessage());
}
throw e;
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.flink.streaming.connectors.redis.common.mapper;

import java.io.Serializable;
import java.time.LocalTime;

/** */
public class RedisCommandDescription extends RedisCommandBaseDescription implements Serializable {
Expand All @@ -9,13 +10,19 @@ public class RedisCommandDescription extends RedisCommandBaseDescription impleme

private Integer ttl;

public RedisCommandDescription(RedisCommand redisCommand, Integer ttl) {
super(redisCommand);
private LocalTime expireTime;

public RedisCommandDescription(RedisCommand redisCommand, Integer ttl, LocalTime expireTime) {
super(redisCommand);
this.expireTime = expireTime;
this.ttl = ttl;
}

public Integer getTTL() {
return ttl;
}

public LocalTime getExpireTime() {
return expireTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.StringUtils;

import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -22,6 +24,8 @@ public abstract class RowRedisSinkMapper

private Integer ttl;

private LocalTime expireTime;

private RedisCommand redisCommand;

public RowRedisSinkMapper(int ttl, RedisCommand redisCommand) {
Expand All @@ -45,11 +49,15 @@ public RowRedisSinkMapper(RedisCommand redisCommand, Map<String, String> config)
public RowRedisSinkMapper(RedisCommand redisCommand, ReadableConfig config) {
this.redisCommand = redisCommand;
this.ttl = config.get(RedisOptions.TTL);
String expireOnTime = config.get(RedisOptions.EXPIRE_ON_TIME);
if (!StringUtils.isNullOrWhitespaceOnly(expireOnTime)) {
this.expireTime = LocalTime.parse(expireOnTime);
}
}

@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(redisCommand, ttl);
return new RedisCommandDescription(redisCommand, ttl, expireTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(RedisOptions.VALUE_DATA_STRUCTURE);
options.add(RedisOptions.REDIS_MASTER_NAME);
options.add(RedisOptions.SENTINELS_INFO);
options.add(RedisOptions.EXPIRE_ON_TIME);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ public RedisLookupFunction(
Preconditions.checkNotNull(
flinkConfigBase, "Redis connection pool config should not be null");
Preconditions.checkNotNull(redisMapper, "Redis Mapper can not be null");
Preconditions.checkNotNull(
redisMapper.getCommandDescription(),
"Redis Mapper data type description can not be null");

this.flinkConfigBase = flinkConfigBase;
this.cacheTtl = redisLookupOptions.getCacheTtl();
Expand All @@ -77,6 +74,8 @@ public RedisLookupFunction(
}

RedisCommandBaseDescription redisCommandDescription = redisMapper.getCommandDescription();
Preconditions.checkNotNull(
redisCommandDescription, "Redis Mapper data type description can not be null");
this.redisCommand = redisCommandDescription.getRedisCommand();
Preconditions.checkArgument(
redisCommand == RedisCommand.HGET || redisCommand == RedisCommand.GET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.LocalTime;
import java.util.List;
import java.util.Objects;

Expand All @@ -34,6 +35,7 @@ public class RedisSinkFunction<IN> extends RichSinkFunction<IN> {
private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class);

protected Integer ttl;
protected int expireTimeSeconds = -1;

private RedisSinkMapper<IN> redisSinkMapper;
private RedisCommand redisCommand;
Expand All @@ -60,18 +62,21 @@ public RedisSinkFunction(
ResolvedSchema resolvedSchema) {
Objects.requireNonNull(flinkConfigBase, "Redis connection pool config should not be null");
Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null");
Objects.requireNonNull(
redisSinkMapper.getCommandDescription(),
"Redis Mapper data type description can not be null");

this.flinkConfigBase = flinkConfigBase;
this.maxRetryTimes = redisSinkOptions.getMaxRetryTimes();
this.redisSinkMapper = redisSinkMapper;
RedisCommandDescription redisCommandDescription =
(RedisCommandDescription) redisSinkMapper.getCommandDescription();
Preconditions.checkNotNull(
redisCommandDescription, "Redis Mapper data type description can not be null");

this.redisCommand = redisCommandDescription.getRedisCommand();
this.ttl = redisCommandDescription.getTTL();
if (redisCommandDescription.getExpireTime() != null) {
this.expireTimeSeconds = redisCommandDescription.getExpireTime().toSecondOfDay();
}

this.columnDataTypes = resolvedSchema.getColumnDataTypes();
this.redisValueDataStructure = redisSinkOptions.getRedisValueDataStructure();
if (redisValueDataStructure == RedisValueDataStructure.row) {
Expand Down Expand Up @@ -123,9 +128,7 @@ private void startSink(String[] params) throws Exception {
for (int i = 0; i <= maxRetryTimes; i++) {
try {
sink(params);
if (ttl != null) {
this.redisCommandsContainer.expire(params[0], ttl);
}
setTtl(params[0]);
break;
} catch (UnsupportedOperationException e) {
throw e;
Expand Down Expand Up @@ -207,6 +210,26 @@ private void sink(String[] params) {
}
}

/**
* set ttl for key.
*
* @param key
*/
private void setTtl(String key) throws Exception {
if (expireTimeSeconds != -1) {
if (this.redisCommandsContainer.getTTL(key).get() == -1) {
int now = LocalTime.now().toSecondOfDay();
this.redisCommandsContainer.expire(
key,
expireTimeSeconds > now
? expireTimeSeconds - now
: 86400 + expireTimeSeconds - now);
}
} else if (ttl != null) {
this.redisCommandsContainer.expire(key, ttl);
}
}

/**
* serialize whole row.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.Preconditions;

import java.time.LocalTime;

import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_COMMAND;

/** Created by jeff.zou on 2020/9/10. */
Expand Down Expand Up @@ -487,4 +489,82 @@ public void testMultiFieldLeftJoinForMap() throws Exception {
Preconditions.condition(singleRedisCommands.hget("11_11", "11").equals("10.3"), "");
Preconditions.condition(singleRedisCommands.hget("12_12", "12") == "", "");
}

@Test
public void testHgetWithUpdate() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

clusterCommands.hset("key1", "field1", "1");
String dim =
"create table dim_table(name varchar, level varchar, age varchar) with ( 'connector'='redis', "
+ "'cluster-nodes'='"
+ CLUSTERNODES
+ "','redis-mode'='cluster', 'password'='"
+ CLUSTER_PASSWORD
+ "','"
+ REDIS_COMMAND
+ "'='"
+ RedisCommand.HGET
+ "' )";

String source =
"create table source_table(age varchar, proctime as procTime()) "
+ "with ('connector'='datagen', 'rows-per-second'='1', "
+ "'fields.age.kind'='sequence', 'fields.age.start'='1', 'fields.age.end'='99'"
+ ")";

String sink =
"create table sink_table(username varchar, level varchar,age varchar) with ( 'connector'='print')";

tEnv.executeSql(source);
tEnv.executeSql(dim);
tEnv.executeSql(sink);

// tEnv.executeSql("insert into dim_table select 'key1', 'field1', age from
// source_table ");
String sql =
" insert into sink_table "
+ " select d.name, d.level, d.age from source_table s"
+ " left join dim_table for system_time as of s.proctime as d "
+ " on d.name ='key1' and d.level ='field1'";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
System.out.println(sql);
}

@Test
public void testSinkValueWithExpire() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

LocalTime localTime = LocalTime.now();
int wait = 100;
localTime = localTime.plusSeconds(wait);
String dim =
"create table sink_redis(name varchar, level varchar, age varchar) with ( 'connector'='redis', "
+ "'cluster-nodes'='"
+ CLUSTERNODES
+ "','redis-mode'='cluster', 'ttl'='10','expire.on.time'='"
+ localTime.toString()
+ "', 'password'='"
+ CLUSTER_PASSWORD
+ "','"
+ REDIS_COMMAND
+ "'='"
+ RedisCommand.HSET
+ "' )";

tEnv.executeSql(dim);
String sql = " insert into sink_redis select * from (values ('1', '11.3', '10.3'))";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
System.out.println(sql);
Preconditions.condition(clusterCommands.exists("1") == 1, "");
Thread.sleep(wait * 1000);
Preconditions.condition(clusterCommands.exists("1") == 0, "");
}
}

0 comments on commit 0bcba63

Please sign in to comment.