From a4b4b8d01b97ccc97d31ae7e362d54093ec45edf Mon Sep 17 00:00:00 2001 From: jeff-zou Date: Fri, 15 Mar 2024 22:30:54 +0800 Subject: [PATCH] check test case --- .../redis/table/RedisResultWrapper.java | 17 +- .../redis/table/LimitedSinkTest.java | 1 + .../connectors/redis/table/SQLExpireTest.java | 13 +- .../redis/table/SQLLettuceLimitTest.java | 24 +- .../connectors/redis/table/SQLTest.java | 232 ++++++------------ .../redis/table/base/TestRedisConfigBase.java | 19 +- 6 files changed, 114 insertions(+), 192 deletions(-) diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisResultWrapper.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisResultWrapper.java index f0318e8..7b5932e 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisResultWrapper.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisResultWrapper.java @@ -24,13 +24,15 @@ public static GenericRowData createRowDataForString( List dataTypes) { if (redisValueDataStructure == RedisValueDataStructure.column) { GenericRowData genericRowData = new GenericRowData(2); - if (value == null) { - return genericRowData; - } genericRowData.setField( 0, RedisRowConverter.dataTypeFromString( dataTypes.get(0).getLogicalType(), String.valueOf(keys[0]))); + if (value == null) { + genericRowData.setField(0, null); + return genericRowData; + } + genericRowData.setField( 1, RedisRowConverter.dataTypeFromString(dataTypes.get(1).getLogicalType(), value)); @@ -79,9 +81,6 @@ public static GenericRowData createRowDataForHash( List dataTypes) { if (redisValueDataStructure == RedisValueDataStructure.column) { GenericRowData genericRowData = new GenericRowData(3); - if (value == null) { - return genericRowData; - } genericRowData.setField( 0, RedisRowConverter.dataTypeFromString( @@ -89,7 +88,11 @@ public static GenericRowData createRowDataForHash( genericRowData.setField( 1, RedisRowConverter.dataTypeFromString( - dataTypes.get(0).getLogicalType(), String.valueOf(keys[1]))); + dataTypes.get(1).getLogicalType(), String.valueOf(keys[1]))); + + if (value == null) { + return genericRowData; + } genericRowData.setField( 2, RedisRowConverter.dataTypeFromString(dataTypes.get(2).getLogicalType(), value)); diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/LimitedSinkTest.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/LimitedSinkTest.java index 5d2f992..e9091f9 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/LimitedSinkTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/LimitedSinkTest.java @@ -16,6 +16,7 @@ public class LimitedSinkTest extends TestRedisConfigBase { @Test public void testLimitedSink() throws Exception { + singleRedisCommands.del("sink_limit_test"); final int ttl = 60000; String sink = "create table sink_redis(key_name varchar, user_name VARCHAR, passport varchar) with ( 'connector'='redis', " diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLExpireTest.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLExpireTest.java index 92206a7..9f0be47 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLExpireTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLExpireTest.java @@ -18,11 +18,11 @@ public class SQLExpireTest extends TestRedisConfigBase { public void testSinkValueWithExpire() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - env.setParallelism(1); + singleRedisCommands.del("1"); String ddl = "create table source_table(uid VARCHAR) with ('connector'='datagen'," + "'rows-per-second'='1', " - + "'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='10')"; + + "'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='1')"; tEnv.executeSql(ddl); String sink = @@ -48,7 +48,7 @@ public void testSinkValueWithExpire() throws Exception { public void testSinkValueWithExpireOnTime() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - + singleRedisCommands.del("1"); LocalTime localTime = LocalTime.now(); int wait = 8; localTime = localTime.plusSeconds(wait); @@ -67,9 +67,8 @@ public void testSinkValueWithExpireOnTime() throws Exception { String sql = " insert into sink_redis select * from (values ('1', '11.3', '10.3'))"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); Preconditions.condition(singleRedisCommands.exists("1") == 1, ""); - Thread.sleep(wait * 1000); + Thread.sleep(10 * 1000); Preconditions.condition(singleRedisCommands.exists("1") == 0, ""); } @@ -77,11 +76,11 @@ public void testSinkValueWithExpireOnTime() throws Exception { public void testSinkValueWithExpireOnKeyPresent() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - env.setParallelism(1); + singleRedisCommands.del("test_hash"); String ddl = "create table source_table(uid VARCHAR) with ('connector'='datagen'," + "'rows-per-second'='1', " - + "'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='8')"; + + "'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='1')"; tEnv.executeSql(ddl); String dim = diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLLettuceLimitTest.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLLettuceLimitTest.java index 997971e..b600c19 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLLettuceLimitTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLLettuceLimitTest.java @@ -5,7 +5,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.table.base.TestRedisConfigBase; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.junit.jupiter.api.Test; @@ -17,7 +16,7 @@ public class SQLLettuceLimitTest extends TestRedisConfigBase { public void testSinkLimitLettucePool() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - env.setParallelism(1); + singleRedisCommands.del("1"); String ddl = "create table source_table(uid VARCHAR) with ('connector'='datagen'," + "'rows-per-second'='1', " @@ -37,7 +36,6 @@ public void testSinkLimitLettucePool() throws Exception { TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); Preconditions.condition(singleRedisCommands.exists("1") == 1, ""); Thread.sleep(10 * 1000); Preconditions.condition(singleRedisCommands.exists("1") == 0, ""); @@ -46,12 +44,10 @@ public void testSinkLimitLettucePool() throws Exception { @Test public void testJoinLimitLettucePool() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - EnvironmentSettings environmentSettings = - EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); - - singleRedisCommands.hset("1", "1", "test"); - singleRedisCommands.hset("5", "5", "test"); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + singleRedisCommands.del("test_hash", "test_hash2"); + singleRedisCommands.hset("test_hash", "1", "test"); + singleRedisCommands.hset("test_hash", "5", "test"); String dim = "create table dim_table(name varchar, level varchar, age varchar) with ( 'connector'='redis', " + "'host'='" @@ -92,15 +88,15 @@ public void testJoinLimitLettucePool() throws Exception { String sql = " insert into sink_table " - + " select concat_ws('_', s.username, s.level), s.level, d.age from source_table s" + + " select 'test_hash2', s.level, d.age from source_table s" + " left join dim_table for system_time as of s.proctime as d " - + " on d.name = s.username and d.level = s.level"; + + " on d.name = 'test_hash' and d.level = s.level"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); System.out.println(sql); - Preconditions.condition(singleRedisCommands.hget("1_1", "1").equals("test"), ""); - Preconditions.condition(singleRedisCommands.hget("2_2", "2") == "", ""); - Preconditions.condition(singleRedisCommands.hget("5_5", "5").equals("test"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash2", "1").equals("test"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash2", "2") == "", ""); + Preconditions.condition(singleRedisCommands.hget("test_hash2", "5").equals("test"), ""); } } diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java index cd48e27..90bb25f 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java @@ -15,12 +15,9 @@ public class SQLTest extends TestRedisConfigBase { @Test - public void testNoPrimaryKeyInsertSQL() throws Exception { + public void testSetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - EnvironmentSettings environmentSettings = - EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String ddl = "create table sink_redis(username VARCHAR, passport time(3)) with ( 'connector'='redis', " @@ -45,13 +42,9 @@ public void testNoPrimaryKeyInsertSQL() throws Exception { } @Test - public void testSingleInsertHashClusterSQL() throws Exception { + public void testHsetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - EnvironmentSettings environmentSettings = - EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); - env.setParallelism(1); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String ddl = "create table sink_redis(username varchar, level varchar, age varchar) with ( 'connector'='redis', " @@ -68,24 +61,19 @@ public void testSingleInsertHashClusterSQL() throws Exception { + "', 'minIdle'='1' )"; tEnv.executeSql(ddl); - String sql = - " insert into sink_redis select * from (values ('test_cluster_sink', '3', '18'))"; + String sql = " insert into sink_redis select * from (values ('test_hash', '3', '18'))"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - Preconditions.condition( - singleRedisCommands.hget("test_cluster_sink", "3").equals("18"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash", "3").equals("18"), ""); } @Test public void testHgetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - EnvironmentSettings environmentSettings = - EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); - - singleRedisCommands.hset("1", "1", "test"); - singleRedisCommands.hset("5", "5", "test"); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + singleRedisCommands.del("test_hash", "test_hash2"); + singleRedisCommands.hset("test_hash", "1", "test"); + singleRedisCommands.hset("test_hash", "5", "test"); String dim = "create table dim_table(name varchar, level varchar, age varchar) with ( 'connector'='redis', " + "'host'='" @@ -126,16 +114,16 @@ public void testHgetSQL() throws Exception { String sql = " insert into sink_table " - + " select concat_ws('_', s.username, s.level), s.level, d.age from source_table s" + + " select 'test_hash2', s.level, d.age from source_table s" + " left join dim_table for system_time as of s.proctime as d " - + " on d.name = s.username and d.level = s.level"; + + " on d.name = 'test_hash' and d.level = s.level"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); System.out.println(sql); - Preconditions.condition(singleRedisCommands.hget("1_1", "1").equals("test"), ""); - Preconditions.condition(singleRedisCommands.hget("2_2", "2") == "", ""); - Preconditions.condition(singleRedisCommands.hget("5_5", "5").equals("test"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash2", "1").equals("test"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash2", "2") == "", ""); + Preconditions.condition(singleRedisCommands.hget("test_hash2", "5").equals("test"), ""); } @Test @@ -145,9 +133,9 @@ public void testHgetFieldIsNullSQL() throws Exception { EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); - - singleRedisCommands.hset("1", "1", "test"); - singleRedisCommands.hset("5", "5", "test"); + singleRedisCommands.del("test_hash", "test_hash2"); + singleRedisCommands.hset("test_hash", "1", "test"); + singleRedisCommands.hset("test_hash", "5", "test"); String dim = "create table dim_table(name varchar, level varchar, age varchar) with ( 'connector'='redis', " + "'host'='" @@ -188,26 +176,23 @@ public void testHgetFieldIsNullSQL() throws Exception { String sql = " insert into sink_table " - + " select concat_ws('_', s.username, s.level), s.level, d.level from source_table s" + + " select 'test_hash2', s.level, d.age from source_table s" + " left join dim_table for system_time as of s.proctime as d " - + " on d.name = s.username and d.level = s.level"; + + " on d.name = 'test_hash' and d.level = s.level"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); System.out.println(sql); - Preconditions.condition(singleRedisCommands.hget("1_1", "1").equals("1"), ""); - Preconditions.condition(singleRedisCommands.hget("2_2", "2") == "", ""); - Preconditions.condition(singleRedisCommands.hget("5_5", "5").equals("5"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash2", "1").equals("test"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash2", "2") == "", ""); + Preconditions.condition(singleRedisCommands.hget("test_hash2", "5").equals("test"), ""); } @Test public void testGetSQL() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - EnvironmentSettings environmentSettings = - EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); - + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + singleRedisCommands.del("10", "11", "12", "13", "14", "15"); singleRedisCommands.set("10", "1800000"); String dim = "create table dim_table(name varchar, login_time time(3) ) with ( 'connector'='redis', " @@ -250,15 +235,14 @@ public void testGetSQL() throws Exception { String sql = " insert into sink_table " - + " select concat_ws('_',s.username, s.level), s.level, d.login_time from source_table s" + + " select 'test_hash', s.level, d.login_time from source_table s" + " left join dim_table for system_time as of s.proctime as d " + " on d.name = s.username"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); - Preconditions.condition(singleRedisCommands.hget("10_10", "10").equals("1800000"), ""); - Preconditions.condition(singleRedisCommands.hget("11_11", "11") == "", ""); - Preconditions.condition(singleRedisCommands.hget("11_11", "12") == null, ""); + Preconditions.condition(singleRedisCommands.hget("test_hash", "10").equals("1800000"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash", "11") == "", ""); + Preconditions.condition(singleRedisCommands.hget("test_hash", "12") == "", ""); } @Test @@ -267,7 +251,6 @@ public void testDel() throws Exception { StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); singleRedisCommands.set("testDel", "20"); - Preconditions.condition(singleRedisCommands.get("testDel").equals("20"), ""); String ddl = "create table redis_sink(redis_key varchar) with('connector'='redis', " + "'host'='" @@ -292,8 +275,9 @@ public void testDel() throws Exception { public void testSRem() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - singleRedisCommands.sadd("s", "test1", "test2"); - Preconditions.condition(singleRedisCommands.sismember("s", "test2"), ""); + singleRedisCommands.del("set"); + singleRedisCommands.sadd("set", "test1", "test2"); + Preconditions.condition(singleRedisCommands.sismember("set", "test2"), ""); String ddl = "create table redis_sink(redis_key varchar, redis_member varchar) with('connector'='redis', " + "'host'='" @@ -309,18 +293,18 @@ public void testSRem() throws Exception { + "') "; tEnv.executeSql(ddl); TableResult tableResult = - tEnv.executeSql("insert into redis_sink select * from (values('s', 'test2'))"); + tEnv.executeSql("insert into redis_sink select * from (values('set', 'test2'))"); tableResult.getJobClient().get().getJobExecutionResult().get(); - Thread.sleep(2000); - Preconditions.condition(singleRedisCommands.sismember("s", "test2") == false, ""); + Preconditions.condition(singleRedisCommands.sismember("set", "test2") == false, ""); } @Test public void testHdel() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - singleRedisCommands.hset("s", "test1", "test2"); - Preconditions.condition(singleRedisCommands.hget("s", "test1").equals("test2"), ""); + singleRedisCommands.del("test_hash"); + singleRedisCommands.hset("test_hash", "test1", "test2"); + Preconditions.condition(singleRedisCommands.hget("test_hash", "test1").equals("test2"), ""); String ddl = "create table redis_sink(redis_key varchar, redis_member varchar) with('connector'='redis', " + "'host'='" @@ -336,18 +320,19 @@ public void testHdel() throws Exception { + "') "; tEnv.executeSql(ddl); TableResult tableResult = - tEnv.executeSql("insert into redis_sink select * from (values('s', 'test1'))"); + tEnv.executeSql( + "insert into redis_sink select * from (values('test_hash', 'test1'))"); tableResult.getJobClient().get().getJobExecutionResult().get(); - Thread.sleep(2000); - Preconditions.condition(singleRedisCommands.hget("s", "test1") == null, ""); + Preconditions.condition(singleRedisCommands.hget("test_hash", "test1") == null, ""); } @Test public void testHIncryBy() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - singleRedisCommands.hset("11", "12", "1"); - Preconditions.condition(singleRedisCommands.hget("11", "12").equals("1"), ""); + singleRedisCommands.del("test_hash"); + singleRedisCommands.hset("test_hash", "12", "1"); + Preconditions.condition(singleRedisCommands.hget("test_hash", "12").equals("1"), ""); String ddl = "create table sink_redis(username VARCHAR, level varchar, score int) with ( 'connector'='redis', " + "'host'='" @@ -363,19 +348,19 @@ public void testHIncryBy() throws Exception { + "')"; tEnv.executeSql(ddl); - String sql = " insert into sink_redis select * from (values ('11', '12', 10))"; + String sql = " insert into sink_redis select * from (values ('test_hash', '12', 10))"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); - Preconditions.condition(singleRedisCommands.hget("11", "12").equals("11"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash", "12").equals("11"), ""); } @Test public void testHIncryByFloat() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - singleRedisCommands.hset("11", "12", "1"); - Preconditions.condition(singleRedisCommands.hget("11", "12").equals("1"), ""); + singleRedisCommands.del("test_hash"); + singleRedisCommands.hset("test_hash", "12", "1"); + Preconditions.condition(singleRedisCommands.hget("test_hash", "12").equals("1"), ""); String ddl = "create table sink_redis(username VARCHAR, level varchar, score float) with ( 'connector'='redis', " + "'host'='" @@ -391,18 +376,17 @@ public void testHIncryByFloat() throws Exception { + "')"; tEnv.executeSql(ddl); - String sql = " insert into sink_redis select * from (values ('11', '12', 10.1))"; + String sql = " insert into sink_redis select * from (values ('test_hash', '12', 10.1))"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); - Preconditions.condition(singleRedisCommands.hget("11", "12").equals("11.1"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash", "12").equals("11.1"), ""); } @Test public void testSinkValueFrom() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - + singleRedisCommands.del("test"); String ddl = "create table sink_redis(username VARCHAR, score double, score2 double) with ( 'connector'='redis', " + "'host'='" @@ -418,19 +402,18 @@ public void testSinkValueFrom() throws Exception { + "', 'value.data.structure'='row')"; tEnv.executeSql(ddl); - String sql = " insert into sink_redis select * from (values ('1', 11.3, 10.3))"; + String sql = " insert into sink_redis select * from (values ('test', 11.3, 10.3))"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); String s = new StringBuilder() - .append("1") + .append("test") .append(RedisDynamicTableFactory.CACHE_SEPERATOR) .append("11.3") .append(RedisDynamicTableFactory.CACHE_SEPERATOR) .append("10.3") .toString(); - Preconditions.condition(singleRedisCommands.get("1").equals(s), ""); + Preconditions.condition(singleRedisCommands.get("test").equals(s), ""); } @Test @@ -438,6 +421,7 @@ public void testMultiFieldLeftJoinForString() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + singleRedisCommands.del("1", "2", "3", "1_1", "2_2"); // init data in redis String ddl = "create table sink_redis(uid VARCHAR, score double, score2 double ) with ( 'connector'='redis', " @@ -454,19 +438,16 @@ public void testMultiFieldLeftJoinForString() throws Exception { + "', 'value.data.structure'='row')"; tEnv.executeSql(ddl); - System.out.println(ddl); // init data in redis String sql = " insert into sink_redis select * from (values ('1', 10.3, 10.1),('2', 10.1, 10.1),('3', 10.3, 10.1))"; - tEnv.executeSql(sql); - System.out.println(sql); + tEnv.executeSql(sql).getJobClient().get().getJobExecutionResult().get(); // create join table ddl = "create table join_table with ('command'='get', 'value.data.structure'='row') like sink_redis"; tEnv.executeSql(ddl); - System.out.println(ddl); // create result table ddl = @@ -483,17 +464,14 @@ public void testMultiFieldLeftJoinForString() throws Exception { + RedisCommand.SET + "')"; tEnv.executeSql(ddl); - System.out.println(ddl); // create source table ddl = "create table source_table(uid VARCHAR, username VARCHAR, proc_time as procTime()) with ('connector'='datagen', 'fields.uid.kind'='sequence', 'fields.uid.start'='1', 'fields.uid.end'='2')"; tEnv.executeSql(ddl); - System.out.println(ddl); sql = "insert into result_table select concat_ws('_', s.uid, s.uid), j.score from source_table as s join join_table for system_time as of s.proc_time as j on j.uid = s.uid "; - System.out.println(sql); TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); Preconditions.condition(singleRedisCommands.get("1_1").equals("10.3"), ""); @@ -505,6 +483,7 @@ public void testMultiFieldLeftJoinForMap() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + singleRedisCommands.del("test_hash"); // init data in redis String ddl = "create table sink_redis(uid VARCHAR, level varchar, score double, score2 double ) with ( 'connector'='redis', " @@ -521,19 +500,16 @@ public void testMultiFieldLeftJoinForMap() throws Exception { + "', 'value.data.structure'='row')"; tEnv.executeSql(ddl); - System.out.println(ddl); // init data in redis - String sql = " insert into sink_redis select * from (values ('11', '11', 10.3, 10.1))"; - tEnv.executeSql(sql); - System.out.println(sql); + String sql = + " insert into sink_redis select * from (values ('test_hash', '11', 10.3, 10.1))"; + tEnv.executeSql(sql).getJobClient().get(); // create join table ddl = "create table join_table with ('command'='hget', 'value.data.structure'='row') like sink_redis"; tEnv.executeSql(ddl); - System.out.println(ddl); - // create result table ddl = "create table result_table(uid VARCHAR, level VARCHAR, score double) with ('connector'='redis', " @@ -549,78 +525,27 @@ public void testMultiFieldLeftJoinForMap() throws Exception { + RedisCommand.HSET + "')"; tEnv.executeSql(ddl); - System.out.println(ddl); // create source table ddl = "create table source_table(uid VARCHAR, level varchar, username VARCHAR, proc_time as procTime()) with ('connector'='datagen', 'fields.uid.kind'='sequence', 'fields.uid.start'='10', 'fields.uid.end'='12', 'fields.level.kind'='sequence', 'fields.level.start'='10', 'fields.level.end'='12')"; tEnv.executeSql(ddl); - System.out.println(ddl); sql = - "insert into result_table select concat_ws('_', s.uid, s.level),s.level, j.score from source_table as s join join_table for system_time as of s.proc_time as j on j.uid = s.uid and j.level = s.level"; - System.out.println(sql); - TableResult tableResult = tEnv.executeSql(sql); - tableResult.getJobClient().get().getJobExecutionResult().get(); - Object o = singleRedisCommands.hget("10_10", "10"); - Preconditions.condition(singleRedisCommands.hget("10_10", "10") == "", ""); - Preconditions.condition(singleRedisCommands.hget("11_11", "11").equals("10.3"), ""); - Preconditions.condition(singleRedisCommands.hget("12_12", "12") == "", ""); - } - - @Test - public void testHgetWithUpdate() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - EnvironmentSettings environmentSettings = - EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); - - singleRedisCommands.hset("key1", "field1", "1"); - String dim = - "create table dim_table(name varchar, level varchar, age varchar) with ( 'connector'='redis', " - + "'host'='" - + REDIS_HOST - + "','port'='" - + REDIS_PORT - + "', 'redis-mode'='single','password'='" - + REDIS_PASSWORD - + "','" - + REDIS_COMMAND - + "'='" - + RedisCommand.HGET - + "' )"; - - String source = - "create table source_table(age varchar, proctime as procTime()) " - + "with ('connector'='datagen', 'rows-per-second'='1', " - + "'fields.age.kind'='sequence', 'fields.age.start'='1', 'fields.age.end'='99'" - + ")"; - - String sink = - "create table sink_table(username varchar, level varchar,age varchar) with ( 'connector'='print')"; - - tEnv.executeSql(source); - tEnv.executeSql(dim); - tEnv.executeSql(sink); - - // tEnv.executeSql("insert into dim_table select 'key1', 'field1', age from - // source_table "); - String sql = - " insert into sink_table " - + " select d.name, d.level, d.age from source_table s" - + " left join dim_table for system_time as of s.proctime as d " - + " on d.name ='key1' and d.level ='field1'"; + "insert into result_table select 'test_hash', concat_ws('_', s.level, s.level), j.score " + + " from source_table as s join join_table for system_time as of s.proc_time as j on j.uid = 'test_hash' and j.level = s.level"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); + Preconditions.condition(singleRedisCommands.hget("test_hash", "10_10") == null, ""); + Preconditions.condition(singleRedisCommands.hget("test_hash", "11_11").equals("10.3"), ""); + Preconditions.condition(singleRedisCommands.hget("test_hash", "12_12") == null, ""); } @Test public void testIncryBy() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - + singleRedisCommands.del("testIncryBy"); String dim = "create table sink_redis(name varchar, level bigint) with ( 'connector'='redis', " + "'host'='" @@ -640,16 +565,16 @@ public void testIncryBy() throws Exception { tEnv.executeSql(sql); TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); - Preconditions.condition(singleRedisCommands.get("testIncryBy").toString().equals("1"), ""); + Preconditions.condition(singleRedisCommands.get("testIncryBy").toString().equals("2"), ""); } @Test public void testIncryBy2() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - + singleRedisCommands.del("testIncryBy"); + singleRedisCommands.incrby("testIncryBy", 1); String dim = "create table sink_redis(name varchar, level bigint) with ( 'connector'='redis', " + "'host'='" @@ -663,14 +588,13 @@ public void testIncryBy2() throws Exception { + "'='" + RedisCommand.INCRBY + "' )"; - singleRedisCommands.incrby("testIncryBy2", 1); + tEnv.executeSql(dim); - String sql = " insert into sink_redis select * from (values ('testIncryBy2', 3));"; + String sql = " insert into sink_redis select * from (values ('testIncryBy', 3));"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); - Preconditions.condition(singleRedisCommands.get("testIncryBy2").toString().equals("4"), ""); + Preconditions.condition(singleRedisCommands.get("testIncryBy").toString().equals("4"), ""); } @Test @@ -680,9 +604,10 @@ public void testSetIfAbsent() throws Exception { EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); + singleRedisCommands.del("test_time"); singleRedisCommands.set("test_time", "14640000"); String ddl = - "create table sink_redis(username VARCHAR, passport time(3)) with ( 'connector'='redis', " + "create table sink_redis(username VARCHAR, passport VARCHAR) with ( 'connector'='redis', " + "'host'='" + REDIS_HOST + "','port'='" @@ -697,15 +622,14 @@ public void testSetIfAbsent() throws Exception { + "')"; tEnv.executeSql(ddl); - String sql = - " insert into sink_redis select * from (values ('test_time', time '05:04:00'))"; + String sql = " insert into sink_redis select * from (values ('test_time', '0'))"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); Preconditions.condition(singleRedisCommands.get("test_time").equals("14640000"), ""); } @Test - public void testhsetIfAbsent() throws Exception { + public void testHsetIfAbsent() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings environmentSettings = @@ -821,7 +745,6 @@ public void testGetAfterInsert() throws Exception { + " on d.name =s.name "; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); } @Test @@ -831,7 +754,7 @@ public void testHGetAfterInsert() throws Exception { EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings); - + singleRedisCommands.del("1", "2", "3", "4", "5"); String dim = "create table dim_table(name varchar, level varchar, age varchar) with ( 'connector'='redis', " + "'host'='" @@ -872,6 +795,5 @@ public void testHGetAfterInsert() throws Exception { + " on d.name =s.name and d.level =s.level"; TableResult tableResult = tEnv.executeSql(sql); tableResult.getJobClient().get().getJobExecutionResult().get(); - System.out.println(sql); } } diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/base/TestRedisConfigBase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/base/TestRedisConfigBase.java index 91fa62c..d4e8bf0 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/base/TestRedisConfigBase.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/base/TestRedisConfigBase.java @@ -5,8 +5,8 @@ import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisCommands; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,13 +20,13 @@ public class TestRedisConfigBase { public static final String REDIS_HOST = "10.11.69.176"; public static final int REDIS_PORT = 6379; public static final String REDIS_PASSWORD = "******"; - protected StatefulRedisConnection singleConnect; - protected RedisCommands singleRedisCommands; + protected static StatefulRedisConnection singleConnect; + protected static RedisCommands singleRedisCommands; - private RedisClient redisClient; + private static RedisClient redisClient; - @BeforeEach - public void connectRedis() { + @BeforeAll + public static void connectRedis() { RedisURI redisURI = RedisURI.builder() .withHost(REDIS_HOST) @@ -39,9 +39,10 @@ public void connectRedis() { LOG.info("connecto to the redis: {}", REDIS_HOST); } - @AfterEach - public void stopSingle() { + @AfterAll + public static void stopSingle() { singleConnect.close(); + redisClient.shutdown(); } protected String sigleWith() {