From 68501989976722b41c7a867a4baf5736bc2279b2 Mon Sep 17 00:00:00 2001 From: xlfjcg <103733994+xlfjcg@users.noreply.github.com> Date: Mon, 20 Jun 2022 20:38:31 +0800 Subject: [PATCH] release v1.2.3 --- .github/workflows/release.yml | 27 ++++-- README.md | 10 +-- pom.xml | 2 +- .../flink/manager/StarRocksQueryVisitor.java | 28 ++++-- .../flink/manager/StarRocksSinkManager.java | 90 +++++++++++-------- .../row/sink/StarRocksCsvSerializer.java | 6 +- .../sink/StarRocksGenericRowTransformer.java | 9 ++ .../row/sink/StarRocksIRowTransformer.java | 7 +- .../row/sink/StarRocksJsonSerializer.java | 7 +- .../sink/StarRocksTableRowTransformer.java | 62 +++++++++---- .../flink/table/StarRocksDataType.java | 47 ++++++++++ .../sink/StarRocksDynamicSinkFunction.java | 31 ++++--- .../source/StarRocksDynamicTableSource.java | 18 ++-- .../source/StarRocksExpressionExtractor.java | 62 +++++-------- .../connector/flink/tools/ClassUtils.java | 24 +++++ .../row/sink/StarRocksCsvSerializerTest.java | 8 +- .../StarRocksDynamicTableSourceTest.java | 82 +++++++++++------ 17 files changed, 349 insertions(+), 171 deletions(-) create mode 100644 src/main/java/com/starrocks/connector/flink/table/StarRocksDataType.java create mode 100644 src/main/java/com/starrocks/connector/flink/tools/ClassUtils.java diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7c2667e2..1a936bdd 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,14 +14,14 @@ jobs: with: fetch-depth: 0 - run: git checkout flink-1.11 && git pull origin flink-1.11 - - name: Pacakge flink-1.11_2.11 + - name: Package flink-1.11_2.11 uses: xlui/action-maven-cli@jdk8 with: lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.11' - id: p1 run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1) - - name: Pacakge flink-1.11_2.12 + - name: Package flink-1.11_2.12 uses: xlui/action-maven-cli@jdk8 with: lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.12' @@ -29,13 +29,13 @@ jobs: run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1) - run: git checkout flink-1.12 && git pull origin flink-1.12 - - name: Pacakge flink-1.12_2.11 + - name: Package flink-1.12_2.11 uses: xlui/action-maven-cli@jdk8 with: lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.11' - id: p3 run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1) - - name: Pacakge flink-1.12_2.12 + - name: Package flink-1.12_2.12 uses: xlui/action-maven-cli@jdk8 with: lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.12' @@ -43,14 +43,14 @@ jobs: run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1) - run: git checkout flink-1.13 && git pull origin flink-1.13 - - name: Pacakge flink-1.13_2.11 + - name: Package flink-1.13_2.11 uses: xlui/action-maven-cli@jdk8 with: lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.11' - id: p5 run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1) - - name: Pacakge flink-1.13_2.12 + - name: Package flink-1.13_2.12 uses: xlui/action-maven-cli@jdk8 with: lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.12' @@ -58,20 +58,28 @@ jobs: run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1) - run: git checkout flink-1.14 && git pull origin flink-1.14 - - name: Pacakge flink-1.14_2.11 + - name: Package flink-1.14_2.11 uses: xlui/action-maven-cli@jdk8 with: lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.11' - id: p7 run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1) - - name: Pacakge flink-1.14_2.12 + - name: Package flink-1.14_2.12 uses: xlui/action-maven-cli@jdk8 with: lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.12' - id: p8 run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1) - + + - run: git checkout flink-1.15 && git pull origin flink-1.15 + - name: Package flink-1.15 + uses: xlui/action-maven-cli@jdk8 + with: + lifecycle: 'clean validate package -Dmaven.test.skip=true' + - id: p9 + run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1) + - name: Release uses: softprops/action-gh-release@v1 with: @@ -84,3 +92,4 @@ jobs: ${{ steps.p6.outputs.jar }} ${{ steps.p7.outputs.jar }} ${{ steps.p8.outputs.jar }} + ${{ steps.p9.outputs.jar }} diff --git a/README.md b/README.md index d1b0e739..0afca452 100644 --- a/README.md +++ b/README.md @@ -277,15 +277,15 @@ tEnv.executeSql( | DOUBLE | DOUBLE | | DECIMAL | DECIMAL | | BINARY | INT | -| CHAR | STRING | -| VARCHAR | STRING | -| STRING | STRING | +| CHAR | JSON / STRING | +| VARCHAR | JSON / STRING | +| STRING | JSON / STRING | | DATE | DATE | | TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME | | TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME | | ARRAY\ | ARRAY\ | -| MAP\ | JSON STRING | -| ROW\ | JSON STRING | +| MAP\ | JSON / JSON STRING | +| ROW\ | JSON / JSON STRING | ### Sink tips diff --git a/pom.xml b/pom.xml index eacccf1e..bb960b51 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ limitations under the License. 4.0.0 com.starrocks flink-connector-starrocks - 1.2.2_flink-1.15 + 1.2.3_flink-1.15 1.8 1.8 diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryVisitor.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryVisitor.java index aedd1e1f..18d96530 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryVisitor.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryVisitor.java @@ -14,20 +14,21 @@ package com.starrocks.connector.flink.manager; +import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider; +import com.starrocks.connector.flink.table.StarRocksDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Serializable; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; -import java.sql.ResultSetMetaData; import java.util.Map; - -import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class StarRocksQueryVisitor implements Serializable { @@ -61,6 +62,17 @@ public List> getTableColumnsMetaData() { return rows; } + public Map getFieldMapping() { + List> columns = getTableColumnsMetaData(); + + Map mapping = new LinkedHashMap<>(); + for (Map column : columns) { + mapping.put(column.get("COLUMN_NAME").toString(), StarRocksDataType.fromString(column.get("DATA_TYPE").toString())); + } + + return mapping; + } + public String getStarRocksVersion() { final String query = "select current_version() as ver;"; List> rows; @@ -69,7 +81,7 @@ public String getStarRocksVersion() { LOG.debug(String.format("Executing query '%s'", query)); } rows = executeQuery(query); - if (null == rows || rows.isEmpty()) { + if (rows.isEmpty()) { return ""; } String version = rows.get(0).get("ver").toString(); @@ -114,7 +126,7 @@ public Long getQueryCount(String SQL) { LOG.debug(String.format("Executing query '%s'", SQL)); } List> data = executeQuery(SQL); - Object opCount = data.get(0).values().stream().findFirst().get(); + Object opCount = data.get(0).values().stream().findFirst().orElse(null); if (null == opCount) { throw new RuntimeException("Faild to get data count from StarRocks. "); } diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java index bbdb8d8f..c7866ee8 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java @@ -14,19 +14,28 @@ package com.starrocks.connector.flink.manager; +import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions; +import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Histogram; import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram; +import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,19 +45,9 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions; -import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider; -import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; -import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.metrics.Counter; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.constraints.UniqueConstraint; - public class StarRocksSinkManager implements Serializable { private static final long serialVersionUID = 1L; @@ -57,9 +56,8 @@ public class StarRocksSinkManager implements Serializable { private final StarRocksJdbcConnectionProvider jdbcConnProvider; private final StarRocksQueryVisitor starrocksQueryVisitor; - private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor; + private StarRocksStreamLoadVisitor starrocksStreamLoadVisitor; private final StarRocksSinkOptions sinkOptions; - private final Map> typesMap; final LinkedBlockingDeque flushQueue = new LinkedBlockingDeque<>(1); private transient Counter totalFlushBytes; @@ -110,27 +108,29 @@ public StarRocksSinkManager(StarRocksSinkOptions sinkOptions, TableSchema flinkS StarRocksJdbcConnectionOptions jdbcOptions = new StarRocksJdbcConnectionOptions(sinkOptions.getJdbcUrl(), sinkOptions.getUsername(), sinkOptions.getPassword()); this.jdbcConnProvider = new StarRocksJdbcConnectionProvider(jdbcOptions); this.starrocksQueryVisitor = new StarRocksQueryVisitor(jdbcConnProvider, sinkOptions.getDatabaseName(), sinkOptions.getTableName()); - // validate table structure - typesMap = new HashMap<>(); - typesMap.put("bigint", Arrays.asList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); - typesMap.put("largeint", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); - typesMap.put("char", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR)); - typesMap.put("date", Arrays.asList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR)); - typesMap.put("datetime", Arrays.asList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR)); - typesMap.put("decimal", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT)); - typesMap.put("double", Arrays.asList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER)); - typesMap.put("float", Arrays.asList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER)); - typesMap.put("int", Arrays.asList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); - typesMap.put("tinyint", Arrays.asList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN)); - typesMap.put("smallint", Arrays.asList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); - typesMap.put("varchar", Arrays.asList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW)); - typesMap.put("string", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW)); - validateTableStructure(flinkSchema); - String version = this.starrocksQueryVisitor.getStarRocksVersion(); + + init(flinkSchema); + } + + public StarRocksSinkManager(StarRocksSinkOptions sinkOptions, + TableSchema flinkSchema, + StarRocksJdbcConnectionProvider jdbcConnProvider, + StarRocksQueryVisitor starrocksQueryVisitor) { + this.sinkOptions = sinkOptions; + this.jdbcConnProvider = jdbcConnProvider; + this.starrocksQueryVisitor = starrocksQueryVisitor; + + init(flinkSchema); + } + + + protected void init(TableSchema schema) { + validateTableStructure(schema); + String version = starrocksQueryVisitor.getStarRocksVersion(); this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor( - sinkOptions, - null == flinkSchema ? new String[]{} : flinkSchema.getFieldNames(), - version.length() > 0 && !version.trim().startsWith("1.") + sinkOptions, + null == schema ? new String[]{} : schema.getFieldNames(), + version.length() > 0 && !version.trim().startsWith("1.") ); } @@ -483,4 +483,24 @@ private void updateHisto(Map result, String key, Histogram histo } } } + + private static final Map> typesMap = new HashMap<>(); + + static { + // validate table structure + typesMap.put("bigint", Arrays.asList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); + typesMap.put("largeint", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); + typesMap.put("char", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR)); + typesMap.put("date", Arrays.asList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR)); + typesMap.put("datetime", Arrays.asList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR)); + typesMap.put("decimal", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT)); + typesMap.put("double", Arrays.asList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER)); + typesMap.put("float", Arrays.asList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER)); + typesMap.put("int", Arrays.asList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); + typesMap.put("tinyint", Arrays.asList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN)); + typesMap.put("smallint", Arrays.asList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY)); + typesMap.put("varchar", Arrays.asList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW)); + typesMap.put("string", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW)); + } + } diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksCsvSerializer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksCsvSerializer.java index cc1ed6d2..7038b3f5 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksCsvSerializer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksCsvSerializer.java @@ -14,13 +14,13 @@ package com.starrocks.connector.flink.row.sink; +import com.alibaba.fastjson.JSON; + import java.util.List; import java.util.Map; -import com.alibaba.fastjson.JSON; - public class StarRocksCsvSerializer implements StarRocksISerializer { - + private static final long serialVersionUID = 1L; private final String columnSeparator; diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java index e3d3a7a8..423fea0a 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksGenericRowTransformer.java @@ -14,10 +14,14 @@ package com.starrocks.connector.flink.row.sink; +import com.starrocks.connector.flink.table.StarRocksDataType; + import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; +import java.util.Map; + public class StarRocksGenericRowTransformer implements StarRocksIRowTransformer { private static final long serialVersionUID = 1L; @@ -29,6 +33,11 @@ public StarRocksGenericRowTransformer(StarRocksSinkRowBuilder consumer) { this.consumer = consumer; } + @Override + public void setStarRocksColumns(Map columns) { + + } + @Override public void setTableSchema(TableSchema ts) { fieldNames = ts.getFieldNames(); diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksIRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksIRowTransformer.java index fbab5340..1c1f92de 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksIRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksIRowTransformer.java @@ -14,13 +14,18 @@ package com.starrocks.connector.flink.row.sink; -import java.io.Serializable; +import com.starrocks.connector.flink.table.StarRocksDataType; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.table.api.TableSchema; +import java.io.Serializable; +import java.util.Map; + public interface StarRocksIRowTransformer extends Serializable { + void setStarRocksColumns(Map columns); + void setTableSchema(TableSchema tableSchema); void setRuntimeContext(RuntimeContext ctx); diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksJsonSerializer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksJsonSerializer.java index b0f4da1e..f978f376 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksJsonSerializer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksJsonSerializer.java @@ -14,11 +14,11 @@ package com.starrocks.connector.flink.row.sink; +import com.alibaba.fastjson.JSON; + import java.util.HashMap; import java.util.Map; -import com.alibaba.fastjson.JSON; - public class StarRocksJsonSerializer implements StarRocksISerializer { private static final long serialVersionUID = 1L; @@ -34,9 +34,10 @@ public String serialize(Object[] values) { Map rowMap = new HashMap<>(values.length); int idx = 0; for (String fieldName : fieldNames) { - rowMap.put(fieldName, values[idx] instanceof Map ? JSON.toJSONString(values[idx]) : values[idx]); + rowMap.put(fieldName, values[idx]); idx++; } + return JSON.toJSONString(rowMap); } diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java index adc30116..3ede085a 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java @@ -14,24 +14,13 @@ package com.starrocks.connector.flink.row.sink; -import java.io.IOException; -import java.io.Serializable; -import java.lang.reflect.Type; -import java.sql.Date; -import java.text.SimpleDateFormat; -import java.time.LocalDate; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; +import com.starrocks.connector.flink.table.StarRocksDataType; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.JSONSerializer; import com.alibaba.fastjson.serializer.ObjectSerializer; import com.alibaba.fastjson.serializer.SerializeConfig; import com.alibaba.fastjson.serializer.SerializeWriter; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.flink.api.common.functions.RuntimeContext; @@ -57,22 +46,42 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Type; +import java.sql.Date; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + public class StarRocksTableRowTransformer implements StarRocksIRowTransformer { private static final long serialVersionUID = 1L; private TypeInformation rowDataTypeInfo; private Function valueTransform; - private DataType[] dataTypes; + private String[] columnNames; + private DataType[] columnDataTypes; + private Map columns; private final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd"); public StarRocksTableRowTransformer(TypeInformation rowDataTypeInfo) { this.rowDataTypeInfo = rowDataTypeInfo; } + @Override + public void setStarRocksColumns(Map columns) { + this.columns = columns; + } + @Override public void setTableSchema(TableSchema ts) { - dataTypes = ts.getFieldDataTypes(); + this.columnNames = ts.getFieldNames(); + this.columnDataTypes = ts.getFieldDataTypes(); } @Override @@ -87,9 +96,9 @@ public void setRuntimeContext(RuntimeContext runtimeCtx) { @Override public Object[] transform(RowData record, boolean supportUpsertDelete) { RowData transformRecord = valueTransform.apply(record); - Object[] values = new Object[dataTypes.length + (supportUpsertDelete ? 1 : 0)]; + Object[] values = new Object[columnDataTypes.length + (supportUpsertDelete ? 1 : 0)]; int idx = 0; - for (DataType dataType : dataTypes) { + for (DataType dataType : columnDataTypes) { values[idx] = typeConvertion(dataType.getLogicalType(), transformRecord, idx); idx++; } @@ -121,7 +130,18 @@ private Object typeConvertion(LogicalType type, RowData record, int pos) { return record.getDouble(pos); case CHAR: case VARCHAR: - return record.getString(pos).toString(); + String sValue = record.getString(pos).toString(); + if (columns == null) { + return sValue; + } + StarRocksDataType starRocksDataType = + columns.getOrDefault(columnNames[pos], StarRocksDataType.UNKNOWN); + if ((starRocksDataType == StarRocksDataType.JSON || + starRocksDataType == StarRocksDataType.UNKNOWN) + && (sValue.charAt(0) == '{' || sValue.charAt(0) == '[')) { + return JSON.parse(sValue); + } + return sValue; case DATE: return dateFormatter.format(Date.valueOf(LocalDate.ofEpochDay(record.getInt(pos)))); case TIMESTAMP_WITHOUT_TIME_ZONE: @@ -148,6 +168,14 @@ private Object typeConvertion(LogicalType type, RowData record, int pos) { Map m = new HashMap<>(); RowData row = record.getRow(pos, rType.getFieldCount()); rType.getFields().parallelStream().forEach(f -> m.put(f.getName(), typeConvertion(f.getType(), row, rType.getFieldIndex(f.getName())))); + if (columns == null) { + return m; + } + StarRocksDataType rStarRocksDataType = + columns.getOrDefault(columnNames[pos], StarRocksDataType.UNKNOWN); + if (rStarRocksDataType == StarRocksDataType.STRING) { + return JSON.toJSONString(m); + } return m; default: throw new UnsupportedOperationException("Unsupported type:" + type); diff --git a/src/main/java/com/starrocks/connector/flink/table/StarRocksDataType.java b/src/main/java/com/starrocks/connector/flink/table/StarRocksDataType.java new file mode 100644 index 00000000..d2129c97 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/StarRocksDataType.java @@ -0,0 +1,47 @@ +package com.starrocks.connector.flink.table; + +import java.util.HashMap; +import java.util.Map; + +public enum StarRocksDataType { + TINYINT, + INT, + LARGEINT, + SMALLINT, + BOOLEAN, + DECIMAL, + DOUBLE, + FLOAT, + BIGINT, + VARCHAR, + CHAR, + STRING, + JSON, + DATE, + DATETIME, + UNKNOWN; + + private static final Map dataTypeMap = new HashMap<>(); + static { + StarRocksDataType[] starRocksDataTypes = StarRocksDataType.values(); + + for (StarRocksDataType starRocksDataType : starRocksDataTypes) { + dataTypeMap.put(starRocksDataType.name(), starRocksDataType); + dataTypeMap.put(starRocksDataType.name().toLowerCase(), starRocksDataType); + } + } + + + public static StarRocksDataType fromString(String typeString) { + if (typeString == null) { + return UNKNOWN; + } + + StarRocksDataType starRocksDataType = dataTypeMap.get(typeString); + if (starRocksDataType == null) { + starRocksDataType = dataTypeMap.getOrDefault(typeString.toUpperCase(), StarRocksDataType.UNKNOWN); + } + + return starRocksDataType; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java index 4a40f991..69df895f 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java @@ -14,7 +14,20 @@ package com.starrocks.connector.flink.table.sink; +import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions; +import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider; +import com.starrocks.connector.flink.manager.StarRocksQueryVisitor; +import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity; +import com.starrocks.connector.flink.manager.StarRocksSinkManager; +import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; +import com.starrocks.connector.flink.row.sink.StarRocksISerializer; +import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory; + import com.google.common.base.Strings; +import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.alter.Alter; +import net.sf.jsqlparser.statement.truncate.Truncate; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; @@ -30,23 +43,12 @@ import org.apache.flink.table.data.binary.NestedRowData; import org.apache.flink.types.RowKind; import org.apache.flink.util.InstantiationUtil; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import net.sf.jsqlparser.parser.CCJSqlParserUtil; -import net.sf.jsqlparser.statement.Statement; -import net.sf.jsqlparser.statement.alter.Alter; -import net.sf.jsqlparser.statement.truncate.Truncate; import java.util.HashMap; import java.util.Map; -import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity; -import com.starrocks.connector.flink.manager.StarRocksSinkManager; -import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; -import com.starrocks.connector.flink.row.sink.StarRocksISerializer; -import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory; - public class StarRocksDynamicSinkFunction extends RichSinkFunction implements CheckpointedFunction { private static final long serialVersionUID = 1L; @@ -65,7 +67,12 @@ public class StarRocksDynamicSinkFunction extends RichSinkFunction impleme private transient ListState> checkpointedState; public StarRocksDynamicSinkFunction(StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer rowTransformer) { - this.sinkManager = new StarRocksSinkManager(sinkOptions, schema); + StarRocksJdbcConnectionOptions jdbcOptions = new StarRocksJdbcConnectionOptions(sinkOptions.getJdbcUrl(), sinkOptions.getUsername(), sinkOptions.getPassword()); + StarRocksJdbcConnectionProvider jdbcConnProvider = new StarRocksJdbcConnectionProvider(jdbcOptions); + StarRocksQueryVisitor starrocksQueryVisitor = new StarRocksQueryVisitor(jdbcConnProvider, sinkOptions.getDatabaseName(), sinkOptions.getTableName()); + this.sinkManager = new StarRocksSinkManager(sinkOptions, schema, jdbcConnProvider, starrocksQueryVisitor); + + rowTransformer.setStarRocksColumns(starrocksQueryVisitor.getFieldMapping()); rowTransformer.setTableSchema(schema); this.serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames()); this.rowTransformer = rowTransformer; diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java index a14ab2be..74b048d3 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSource.java @@ -14,8 +14,11 @@ package com.starrocks.connector.flink.table.source; -import org.apache.flink.table.api.DataTypes; +import com.starrocks.connector.flink.table.source.struct.ColunmRichInfo; +import com.starrocks.connector.flink.table.source.struct.PushDownHolder; +import com.starrocks.connector.flink.table.source.struct.SelectColumn; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -30,14 +33,11 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; -import com.starrocks.connector.flink.table.source.struct.ColunmRichInfo; -import com.starrocks.connector.flink.table.source.struct.PushDownHolder; -import com.starrocks.connector.flink.table.source.struct.SelectColumn; - public class StarRocksDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsLimitPushDown, SupportsFilterPushDown, SupportsProjectionPushDown { private final TableSchema flinkSchema; @@ -128,21 +128,27 @@ public void applyProjection(int[][] projectedFields) { @Override public Result applyFilters(List filtersExpressions) { List filters = new ArrayList<>(); + List ac = new LinkedList<>(); + List remain = new LinkedList<>(); + StarRocksExpressionExtractor extractor = new StarRocksExpressionExtractor(); for (ResolvedExpression expression : filtersExpressions) { if (expression.getOutputDataType().equals(DataTypes.BOOLEAN()) && expression.getChildren().size() == 0) { filters.add(expression.accept(extractor) + " = true"); + ac.add(expression); continue; } String str = expression.accept(extractor); if (str == null) { + remain.add(expression); continue; } filters.add(str); + ac.add(expression); } Optional filter = Optional.of(String.join(" and ", filters)); this.pushDownHolder.setFilter(filter.get()); - return Result.of(filtersExpressions, new ArrayList<>()); + return Result.of(ac, remain); } @Override diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java index ce814534..c8d4a4c3 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksExpressionExtractor.java @@ -14,12 +14,6 @@ package com.starrocks.connector.flink.table.source; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionVisitor; @@ -30,59 +24,49 @@ import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + public class StarRocksExpressionExtractor implements ExpressionVisitor { - private static final Map FUNC_TO_STR = new HashMap<>(); + private static final Map> SUPPORT_FUNC = new HashMap<>(); static { - FUNC_TO_STR.put(BuiltInFunctionDefinitions.EQUALS, "="); - FUNC_TO_STR.put(BuiltInFunctionDefinitions.NOT_EQUALS, "<>"); - FUNC_TO_STR.put(BuiltInFunctionDefinitions.GREATER_THAN, ">"); - FUNC_TO_STR.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, ">="); - FUNC_TO_STR.put(BuiltInFunctionDefinitions.LESS_THAN, "<"); - FUNC_TO_STR.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, "<="); - FUNC_TO_STR.put(BuiltInFunctionDefinitions.AND, "and"); - FUNC_TO_STR.put(BuiltInFunctionDefinitions.OR, "or"); - // FUNC_TO_STR.put(BuiltInFunctionDefinitions.IS_NULL, "is"); - // FUNC_TO_STR.put(BuiltInFunctionDefinitions.IS_NOT_NULL, "is not"); - // FUNC_TO_STR.put(BuiltInFunctionDefinitions.NOT, ""); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.EQUALS, args -> args[0] + " = " + args[1]); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.NOT_EQUALS, args -> args[0] + " <> " + args[1]); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.GREATER_THAN, args -> args[0] + " > " + args[1]); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, args -> args[0] + " >= " + args[1]); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.LESS_THAN, args -> args[0] + " < " + args[1]); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, args -> args[0] + " <= " + args[1]); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.AND, args -> args[0] + " and " + args[1]); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.OR, args -> args[0] + " or " + args[1]); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.IS_NULL, args -> args[0] + " is null"); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.IS_NOT_NULL, args -> args[0] + " is not null"); + SUPPORT_FUNC.put(BuiltInFunctionDefinitions.NOT, args -> args[0] + " = false"); } @Override public String visit(CallExpression call) { FunctionDefinition funcDef = call.getFunctionDefinition(); - if (funcDef.equals(BuiltInFunctionDefinitions.LIKE)) { - throw new RuntimeException("Not support filter -> [like]"); - } - if (funcDef.equals(BuiltInFunctionDefinitions.IN)) { - throw new RuntimeException("Not support filter -> [in]"); - } - if (funcDef.equals(BuiltInFunctionDefinitions.BETWEEN)) { - throw new RuntimeException("Not support filter -> [between]"); - } + if (funcDef.equals(BuiltInFunctionDefinitions.CAST)) { return call.getChildren().get(0).accept(this); } - if (funcDef.equals(BuiltInFunctionDefinitions.NOT) && call.getOutputDataType().equals(DataTypes.BOOLEAN())) { - return call.getChildren().get(0).toString() + " = false"; - } - if (funcDef.equals(BuiltInFunctionDefinitions.IS_NULL)) { - return call.getChildren().get(0).toString() + " is null"; - } - if (funcDef.equals(BuiltInFunctionDefinitions.IS_NOT_NULL)) { - return call.getChildren().get(0).toString() + " is not null"; - } - if (FUNC_TO_STR.containsKey(funcDef)) { + + if (SUPPORT_FUNC.containsKey(funcDef)) { List operands = new ArrayList<>(); for (Expression child : call.getChildren()) { String operand = child.accept(this); if (operand == null) { - continue; + return null; } operands.add(operand); } - return "(" + String.join(" " + FUNC_TO_STR.get(funcDef) + " ", operands) + ")"; + return "(" + SUPPORT_FUNC.get(funcDef).apply(operands.toArray(new String[0])) + ")"; } return null; } diff --git a/src/main/java/com/starrocks/connector/flink/tools/ClassUtils.java b/src/main/java/com/starrocks/connector/flink/tools/ClassUtils.java new file mode 100644 index 00000000..af69935a --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/tools/ClassUtils.java @@ -0,0 +1,24 @@ +package com.starrocks.connector.flink.tools; + +import java.util.HashSet; +import java.util.Set; + +public class ClassUtils { + private static final Set> wrapperPrimitives = new HashSet<>(); + + static { + wrapperPrimitives.add(Boolean.class); + wrapperPrimitives.add(Byte.class); + wrapperPrimitives.add(Character.class); + wrapperPrimitives.add(Short.class); + wrapperPrimitives.add(Integer.class); + wrapperPrimitives.add(Long.class); + wrapperPrimitives.add(Double.class); + wrapperPrimitives.add(Float.class); + wrapperPrimitives.add(Void.TYPE); + } + + public static boolean isPrimitiveWrapper(Class type) { + return wrapperPrimitives.contains(type); + } +} diff --git a/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksCsvSerializerTest.java b/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksCsvSerializerTest.java index 8a8620b0..879703c4 100644 --- a/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksCsvSerializerTest.java +++ b/src/test/java/com/starrocks/connector/flink/row/sink/StarRocksCsvSerializerTest.java @@ -14,9 +14,9 @@ package com.starrocks.connector.flink.row.sink; -import org.junit.Test; +import com.starrocks.connector.flink.StarRocksSinkBaseTest; -import static org.junit.Assert.assertEquals; +import org.junit.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -24,9 +24,7 @@ import java.util.List; import java.util.stream.Collectors; -import com.starrocks.connector.flink.StarRocksSinkBaseTest; -import com.starrocks.connector.flink.row.sink.StarRocksISerializer; -import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory; +import static org.junit.Assert.assertEquals; public class StarRocksCsvSerializerTest extends StarRocksSinkBaseTest { diff --git a/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java b/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java index a8ad7a04..edfd49b7 100644 --- a/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java +++ b/src/test/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceTest.java @@ -14,30 +14,26 @@ package com.starrocks.connector.flink.table.source; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; - import com.starrocks.connector.flink.it.source.StarRocksSourceBaseTest; import com.starrocks.connector.flink.table.source.struct.PushDownHolder; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.api.internal.TableEnvironmentInternal; -import org.apache.flink.table.delegation.Parser; -import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral; +import static org.junit.Assert.assertEquals; public class StarRocksDynamicTableSourceTest extends StarRocksSourceBaseTest { @@ -69,7 +65,18 @@ public void testApplyProjection() { @Test public void testFilter() { - String filter = null; + String filter; + + ResolvedExpression c9Ref = new FieldReferenceExpression("c6", DataTypes.STRING(), 0, 2); + ResolvedExpression c9CharLength = new CallExpression(BuiltInFunctionDefinitions.CHAR_LENGTH, Collections.singletonList(c9Ref), DataTypes.INT()); + ResolvedExpression c9Exp = + new CallExpression( + BuiltInFunctionDefinitions.LESS_THAN, + Arrays.asList(c9CharLength, valueLiteral(10)), + DataTypes.BOOLEAN()); + dynamicTableSource.applyFilters(Collections.singletonList(c9Exp)); + filter = pushDownHolder.getFilter(); + Assert.assertTrue(filter.isEmpty()); ResolvedExpression c5Ref = new FieldReferenceExpression("c5", DataTypes.TIMESTAMP(), 0, 2); ResolvedExpression c5Exp = @@ -79,7 +86,7 @@ public void testFilter() { DataTypes.BOOLEAN()); dynamicTableSource.applyFilters(Arrays.asList(c5Exp)); filter = pushDownHolder.getFilter(); - assertTrue(filter.equals("(c5 = '2022-1-22 00:00:00')")); + assertEquals("(c5 = '2022-1-22 00:00:00')", filter); ResolvedExpression c4Ref = new FieldReferenceExpression("c4", DataTypes.DATE(), 0, 2); ResolvedExpression c4Exp = @@ -87,9 +94,9 @@ public void testFilter() { BuiltInFunctionDefinitions.EQUALS, Arrays.asList(c4Ref, valueLiteral("2022-1-22")), DataTypes.BOOLEAN()); - dynamicTableSource.applyFilters(Arrays.asList(c4Exp)); + dynamicTableSource.applyFilters(Collections.singletonList(c4Exp)); filter = pushDownHolder.getFilter(); - assertTrue(filter.equals("(c4 = '2022-1-22')")); + assertEquals("(c4 = '2022-1-22')", filter); ResolvedExpression c3Ref = new FieldReferenceExpression("c3", DataTypes.BOOLEAN(), 0, 2); ResolvedExpression c3Exp = @@ -97,9 +104,9 @@ public void testFilter() { BuiltInFunctionDefinitions.EQUALS, Arrays.asList(c3Ref, valueLiteral(true)), DataTypes.BOOLEAN()); - dynamicTableSource.applyFilters(Arrays.asList(c3Exp)); + dynamicTableSource.applyFilters(Collections.singletonList(c3Exp)); filter = pushDownHolder.getFilter(); - assertTrue(filter.equals("(c3 = true)")); + assertEquals("(c3 = true)", filter); ResolvedExpression c2Ref = new FieldReferenceExpression("c2", DataTypes.INT(), 0, 2); ResolvedExpression c2Exp = @@ -138,16 +145,16 @@ public void testFilter() { DataTypes.BOOLEAN()) )); filter = pushDownHolder.getFilter(); - assertTrue(filter.equals("(c1 = 1) and (c1 <> 1) and (c1 > 1) and (c1 >= 1) and (c1 < 1) and (c1 <= 1)")); + assertEquals("(c1 = 1) and (c1 <> 1) and (c1 > 1) and (c1 >= 1) and (c1 < 1) and (c1 <= 1)", filter); dynamicTableSource.applyFilters(Arrays.asList(c1Exp, c2Exp)); filter = pushDownHolder.getFilter(); - assertTrue(filter.equals("(c1 = 1) and (c2 = 2)")); + assertEquals("(c1 = 1) and (c2 = 2)", filter); dynamicTableSource.applyFilters(Arrays.asList(new CallExpression(BuiltInFunctionDefinitions.OR, Arrays.asList(c1Exp, c3Exp), DataTypes.BOOLEAN()))); filter = pushDownHolder.getFilter(); - assertTrue(filter.equals("((c1 = 1) or (c3 = true))")); + assertEquals("((c1 = 1) or (c3 = true))", filter); ResolvedExpression c6Exp = @@ -156,10 +163,10 @@ public void testFilter() { Arrays.asList(c1Ref, valueLiteral(1)), DataTypes.BOOLEAN()); try { - dynamicTableSource.applyFilters(Arrays.asList(c6Exp)); + dynamicTableSource.applyFilters(Collections.singletonList(c6Exp)); } catch (Exception e) { e.printStackTrace(); - assertTrue(e.getMessage().equals("Not support filter -> [like]")); + assertEquals("Not support filter -> [like]", e.getMessage()); } ResolvedExpression c7Exp = @@ -168,10 +175,10 @@ public void testFilter() { Arrays.asList(c1Ref, valueLiteral(1)), DataTypes.BOOLEAN()); try { - dynamicTableSource.applyFilters(Arrays.asList(c7Exp)); + dynamicTableSource.applyFilters(Collections.singletonList(c7Exp)); } catch (Exception e) { e.printStackTrace(); - assertTrue(e.getMessage().equals("Not support filter -> [in]")); + assertEquals("Not support filter -> [in]", e.getMessage()); } ResolvedExpression c8Exp = @@ -180,10 +187,31 @@ public void testFilter() { Arrays.asList(c1Ref, valueLiteral(1)), DataTypes.BOOLEAN()); try { - dynamicTableSource.applyFilters(Arrays.asList(c8Exp)); + dynamicTableSource.applyFilters(Collections.singletonList(c8Exp)); } catch (Exception e) { e.printStackTrace(); - assertTrue(e.getMessage().equals("Not support filter -> [between]")); + assertEquals("Not support filter -> [between]", e.getMessage()); } - } + } + + @Test + public void test() { + EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); + TableEnvironment env = TableEnvironment.create(settings); +// env.explainSql("create table HTWSource (item_key string, vehicle_id string, item_type string, item_value string, modify_time timestamp) WITH ('connector' = 'datagen')"); + env.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH ('connector' = 'datagen')"); + env.executeSql("CREATE TABLE HTWSource (`item_key` BIGINT, vehicle_id STRING, item_type STRING, item_value STRING, modify_time STRING) WITH ('connector' = 'datagen')"); + System.out.println(env.explainSql( + "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")); + + System.out.println(env.explainSql("select item_key,\n" + + " vehicle_id,\n" + + " item_type,item_value,modify_time\n" + + "from HTWSource \n" + + "where CHAR_LENGTH(vehicle_id) < 10")); + + List s = Collections.singletonList("a"); + + System.out.println(String.join(" is not null", s)); + } }