Skip to content

Commit

Permalink
check test case
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-zou committed Mar 15, 2024
1 parent 5c9b88f commit a4b4b8d
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ public static GenericRowData createRowDataForString(
List<DataType> dataTypes) {
if (redisValueDataStructure == RedisValueDataStructure.column) {
GenericRowData genericRowData = new GenericRowData(2);
if (value == null) {
return genericRowData;
}
genericRowData.setField(
0,
RedisRowConverter.dataTypeFromString(
dataTypes.get(0).getLogicalType(), String.valueOf(keys[0])));
if (value == null) {
genericRowData.setField(0, null);
return genericRowData;
}

genericRowData.setField(
1,
RedisRowConverter.dataTypeFromString(dataTypes.get(1).getLogicalType(), value));
Expand Down Expand Up @@ -79,17 +81,18 @@ public static GenericRowData createRowDataForHash(
List<DataType> dataTypes) {
if (redisValueDataStructure == RedisValueDataStructure.column) {
GenericRowData genericRowData = new GenericRowData(3);
if (value == null) {
return genericRowData;
}
genericRowData.setField(
0,
RedisRowConverter.dataTypeFromString(
dataTypes.get(0).getLogicalType(), String.valueOf(keys[0])));
genericRowData.setField(
1,
RedisRowConverter.dataTypeFromString(
dataTypes.get(0).getLogicalType(), String.valueOf(keys[1])));
dataTypes.get(1).getLogicalType(), String.valueOf(keys[1])));

if (value == null) {
return genericRowData;
}
genericRowData.setField(
2,
RedisRowConverter.dataTypeFromString(dataTypes.get(2).getLogicalType(), value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class LimitedSinkTest extends TestRedisConfigBase {

@Test
public void testLimitedSink() throws Exception {
singleRedisCommands.del("sink_limit_test");
final int ttl = 60000;
String sink =
"create table sink_redis(key_name varchar, user_name VARCHAR, passport varchar) with ( 'connector'='redis', "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ public class SQLExpireTest extends TestRedisConfigBase {
public void testSinkValueWithExpire() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
singleRedisCommands.del("1");
String ddl =
"create table source_table(uid VARCHAR) with ('connector'='datagen',"
+ "'rows-per-second'='1', "
+ "'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='10')";
+ "'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='1')";
tEnv.executeSql(ddl);

String sink =
Expand All @@ -48,7 +48,7 @@ public void testSinkValueWithExpire() throws Exception {
public void testSinkValueWithExpireOnTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

singleRedisCommands.del("1");
LocalTime localTime = LocalTime.now();
int wait = 8;
localTime = localTime.plusSeconds(wait);
Expand All @@ -67,21 +67,20 @@ public void testSinkValueWithExpireOnTime() throws Exception {
String sql = " insert into sink_redis select * from (values ('1', '11.3', '10.3'))";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
System.out.println(sql);
Preconditions.condition(singleRedisCommands.exists("1") == 1, "");
Thread.sleep(wait * 1000);
Thread.sleep(10 * 1000);
Preconditions.condition(singleRedisCommands.exists("1") == 0, "");
}

@Test
public void testSinkValueWithExpireOnKeyPresent() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
singleRedisCommands.del("test_hash");
String ddl =
"create table source_table(uid VARCHAR) with ('connector'='datagen',"
+ "'rows-per-second'='1', "
+ "'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='8')";
+ "'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='1')";
tEnv.executeSql(ddl);

String dim =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.table.base.TestRedisConfigBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.jupiter.api.Test;
Expand All @@ -17,7 +16,7 @@ public class SQLLettuceLimitTest extends TestRedisConfigBase {
public void testSinkLimitLettucePool() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
singleRedisCommands.del("1");
String ddl =
"create table source_table(uid VARCHAR) with ('connector'='datagen',"
+ "'rows-per-second'='1', "
Expand All @@ -37,7 +36,6 @@ public void testSinkLimitLettucePool() throws Exception {

TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
System.out.println(sql);
Preconditions.condition(singleRedisCommands.exists("1") == 1, "");
Thread.sleep(10 * 1000);
Preconditions.condition(singleRedisCommands.exists("1") == 0, "");
Expand All @@ -46,12 +44,10 @@ public void testSinkLimitLettucePool() throws Exception {
@Test
public void testJoinLimitLettucePool() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);

singleRedisCommands.hset("1", "1", "test");
singleRedisCommands.hset("5", "5", "test");
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
singleRedisCommands.del("test_hash", "test_hash2");
singleRedisCommands.hset("test_hash", "1", "test");
singleRedisCommands.hset("test_hash", "5", "test");
String dim =
"create table dim_table(name varchar, level varchar, age varchar) with ( 'connector'='redis', "
+ "'host'='"
Expand Down Expand Up @@ -92,15 +88,15 @@ public void testJoinLimitLettucePool() throws Exception {

String sql =
" insert into sink_table "
+ " select concat_ws('_', s.username, s.level), s.level, d.age from source_table s"
+ " select 'test_hash2', s.level, d.age from source_table s"
+ " left join dim_table for system_time as of s.proctime as d "
+ " on d.name = s.username and d.level = s.level";
+ " on d.name = 'test_hash' and d.level = s.level";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
System.out.println(sql);

Preconditions.condition(singleRedisCommands.hget("1_1", "1").equals("test"), "");
Preconditions.condition(singleRedisCommands.hget("2_2", "2") == "", "");
Preconditions.condition(singleRedisCommands.hget("5_5", "5").equals("test"), "");
Preconditions.condition(singleRedisCommands.hget("test_hash2", "1").equals("test"), "");
Preconditions.condition(singleRedisCommands.hget("test_hash2", "2") == "", "");
Preconditions.condition(singleRedisCommands.hget("test_hash2", "5").equals("test"), "");
}
}
Loading

0 comments on commit a4b4b8d

Please sign in to comment.