Skip to content

Commit

Permalink
release v1.2.3
Browse files Browse the repository at this point in the history
  • Loading branch information
xlfjcg authored Jun 20, 2022
1 parent decbf58 commit 6850198
Show file tree
Hide file tree
Showing 17 changed files with 349 additions and 171 deletions.
27 changes: 18 additions & 9 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,64 +14,72 @@ jobs:
with:
fetch-depth: 0
- run: git checkout flink-1.11 && git pull origin flink-1.11
- name: Pacakge flink-1.11_2.11
- name: Package flink-1.11_2.11
uses: xlui/action-maven-cli@jdk8
with:
lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.11'
- id: p1
run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1)

- name: Pacakge flink-1.11_2.12
- name: Package flink-1.11_2.12
uses: xlui/action-maven-cli@jdk8
with:
lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.12'
- id: p2
run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1)

- run: git checkout flink-1.12 && git pull origin flink-1.12
- name: Pacakge flink-1.12_2.11
- name: Package flink-1.12_2.11
uses: xlui/action-maven-cli@jdk8
with:
lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.11'
- id: p3
run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1)
- name: Pacakge flink-1.12_2.12
- name: Package flink-1.12_2.12
uses: xlui/action-maven-cli@jdk8
with:
lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.12'
- id: p4
run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1)

- run: git checkout flink-1.13 && git pull origin flink-1.13
- name: Pacakge flink-1.13_2.11
- name: Package flink-1.13_2.11
uses: xlui/action-maven-cli@jdk8
with:
lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.11'
- id: p5
run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1)

- name: Pacakge flink-1.13_2.12
- name: Package flink-1.13_2.12
uses: xlui/action-maven-cli@jdk8
with:
lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.12'
- id: p6
run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1)

- run: git checkout flink-1.14 && git pull origin flink-1.14
- name: Pacakge flink-1.14_2.11
- name: Package flink-1.14_2.11
uses: xlui/action-maven-cli@jdk8
with:
lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.11'
- id: p7
run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1)

- name: Pacakge flink-1.14_2.12
- name: Package flink-1.14_2.12
uses: xlui/action-maven-cli@jdk8
with:
lifecycle: 'clean validate package -Dmaven.test.skip=true -Pscala-2.12'
- id: p8
run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1)


- run: git checkout flink-1.15 && git pull origin flink-1.15
- name: Package flink-1.15
uses: xlui/action-maven-cli@jdk8
with:
lifecycle: 'clean validate package -Dmaven.test.skip=true'
- id: p9
run: find target/ -name "flink-connector*.jar" | grep -i "[0-9].jar" | xargs -i cp {} ./ && echo ::set-output name=jar::$(ls -t flink-connector*.jar | head -n 1)

- name: Release
uses: softprops/action-gh-release@v1
with:
Expand All @@ -84,3 +92,4 @@ jobs:
${{ steps.p6.outputs.jar }}
${{ steps.p7.outputs.jar }}
${{ steps.p8.outputs.jar }}
${{ steps.p9.outputs.jar }}
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,15 @@ tEnv.executeSql(
| DOUBLE | DOUBLE |
| DECIMAL | DECIMAL |
| BINARY | INT |
| CHAR | STRING |
| VARCHAR | STRING |
| STRING | STRING |
| CHAR | JSON / STRING |
| VARCHAR | JSON / STRING |
| STRING | JSON / STRING |
| DATE | DATE |
| TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
| ARRAY\<T\> | ARRAY\<T\> |
| MAP\<KT,VT\> | JSON STRING |
| ROW\<arg T...\> | JSON STRING |
| MAP\<KT,VT\> | JSON / JSON STRING |
| ROW\<arg T...\> | JSON / JSON STRING |

### Sink tips

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ limitations under the License.
<modelVersion>4.0.0</modelVersion>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.2_flink-1.15</version>
<version>1.2.3_flink-1.15</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@

package com.starrocks.connector.flink.manager;

import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
import com.starrocks.connector.flink.table.StarRocksDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.sql.ResultSetMetaData;
import java.util.Map;

import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StarRocksQueryVisitor implements Serializable {

Expand Down Expand Up @@ -61,6 +62,17 @@ public List<Map<String, Object>> getTableColumnsMetaData() {
return rows;
}

public Map<String, StarRocksDataType> getFieldMapping() {
List<Map<String, Object>> columns = getTableColumnsMetaData();

Map<String, StarRocksDataType> mapping = new LinkedHashMap<>();
for (Map<String, Object> column : columns) {
mapping.put(column.get("COLUMN_NAME").toString(), StarRocksDataType.fromString(column.get("DATA_TYPE").toString()));
}

return mapping;
}

public String getStarRocksVersion() {
final String query = "select current_version() as ver;";
List<Map<String, Object>> rows;
Expand All @@ -69,7 +81,7 @@ public String getStarRocksVersion() {
LOG.debug(String.format("Executing query '%s'", query));
}
rows = executeQuery(query);
if (null == rows || rows.isEmpty()) {
if (rows.isEmpty()) {
return "";
}
String version = rows.get(0).get("ver").toString();
Expand Down Expand Up @@ -114,7 +126,7 @@ public Long getQueryCount(String SQL) {
LOG.debug(String.format("Executing query '%s'", SQL));
}
List<Map<String, Object>> data = executeQuery(SQL);
Object opCount = data.get(0).values().stream().findFirst().get();
Object opCount = data.get(0).values().stream().findFirst().orElse(null);
if (null == opCount) {
throw new RuntimeException("Faild to get data count from StarRocks. ");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,28 @@

package com.starrocks.connector.flink.manager;

import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -36,19 +45,9 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.constraints.UniqueConstraint;

public class StarRocksSinkManager implements Serializable {

private static final long serialVersionUID = 1L;
Expand All @@ -57,9 +56,8 @@ public class StarRocksSinkManager implements Serializable {

private final StarRocksJdbcConnectionProvider jdbcConnProvider;
private final StarRocksQueryVisitor starrocksQueryVisitor;
private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
private StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
private final StarRocksSinkOptions sinkOptions;
private final Map<String, List<LogicalTypeRoot>> typesMap;
final LinkedBlockingDeque<StarRocksSinkBufferEntity> flushQueue = new LinkedBlockingDeque<>(1);

private transient Counter totalFlushBytes;
Expand Down Expand Up @@ -110,27 +108,29 @@ public StarRocksSinkManager(StarRocksSinkOptions sinkOptions, TableSchema flinkS
StarRocksJdbcConnectionOptions jdbcOptions = new StarRocksJdbcConnectionOptions(sinkOptions.getJdbcUrl(), sinkOptions.getUsername(), sinkOptions.getPassword());
this.jdbcConnProvider = new StarRocksJdbcConnectionProvider(jdbcOptions);
this.starrocksQueryVisitor = new StarRocksQueryVisitor(jdbcConnProvider, sinkOptions.getDatabaseName(), sinkOptions.getTableName());
// validate table structure
typesMap = new HashMap<>();
typesMap.put("bigint", Arrays.asList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("largeint", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("char", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR));
typesMap.put("date", Arrays.asList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR));
typesMap.put("datetime", Arrays.asList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR));
typesMap.put("decimal", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT));
typesMap.put("double", Arrays.asList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER));
typesMap.put("float", Arrays.asList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER));
typesMap.put("int", Arrays.asList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("tinyint", Arrays.asList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN));
typesMap.put("smallint", Arrays.asList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("varchar", Arrays.asList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
typesMap.put("string", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
validateTableStructure(flinkSchema);
String version = this.starrocksQueryVisitor.getStarRocksVersion();

init(flinkSchema);
}

public StarRocksSinkManager(StarRocksSinkOptions sinkOptions,
TableSchema flinkSchema,
StarRocksJdbcConnectionProvider jdbcConnProvider,
StarRocksQueryVisitor starrocksQueryVisitor) {
this.sinkOptions = sinkOptions;
this.jdbcConnProvider = jdbcConnProvider;
this.starrocksQueryVisitor = starrocksQueryVisitor;

init(flinkSchema);
}


protected void init(TableSchema schema) {
validateTableStructure(schema);
String version = starrocksQueryVisitor.getStarRocksVersion();
this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(
sinkOptions,
null == flinkSchema ? new String[]{} : flinkSchema.getFieldNames(),
version.length() > 0 && !version.trim().startsWith("1.")
sinkOptions,
null == schema ? new String[]{} : schema.getFieldNames(),
version.length() > 0 && !version.trim().startsWith("1.")
);
}

Expand Down Expand Up @@ -483,4 +483,24 @@ private void updateHisto(Map<String, Object> result, String key, Histogram histo
}
}
}

private static final Map<String, List<LogicalTypeRoot>> typesMap = new HashMap<>();

static {
// validate table structure
typesMap.put("bigint", Arrays.asList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("largeint", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("char", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR));
typesMap.put("date", Arrays.asList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR));
typesMap.put("datetime", Arrays.asList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR));
typesMap.put("decimal", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT));
typesMap.put("double", Arrays.asList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER));
typesMap.put("float", Arrays.asList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER));
typesMap.put("int", Arrays.asList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("tinyint", Arrays.asList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN));
typesMap.put("smallint", Arrays.asList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
typesMap.put("varchar", Arrays.asList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
typesMap.put("string", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

package com.starrocks.connector.flink.row.sink;

import com.alibaba.fastjson.JSON;

import java.util.List;
import java.util.Map;

import com.alibaba.fastjson.JSON;

public class StarRocksCsvSerializer implements StarRocksISerializer {

private static final long serialVersionUID = 1L;

private final String columnSeparator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

package com.starrocks.connector.flink.row.sink;

import com.starrocks.connector.flink.table.StarRocksDataType;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;

import java.util.Map;

public class StarRocksGenericRowTransformer<T> implements StarRocksIRowTransformer<T> {

private static final long serialVersionUID = 1L;
Expand All @@ -29,6 +33,11 @@ public StarRocksGenericRowTransformer(StarRocksSinkRowBuilder<T> consumer) {
this.consumer = consumer;
}

@Override
public void setStarRocksColumns(Map<String, StarRocksDataType> columns) {

}

@Override
public void setTableSchema(TableSchema ts) {
fieldNames = ts.getFieldNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@

package com.starrocks.connector.flink.row.sink;

import java.io.Serializable;
import com.starrocks.connector.flink.table.StarRocksDataType;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.table.api.TableSchema;

import java.io.Serializable;
import java.util.Map;

public interface StarRocksIRowTransformer<T> extends Serializable {

void setStarRocksColumns(Map<String, StarRocksDataType> columns);

void setTableSchema(TableSchema tableSchema);

void setRuntimeContext(RuntimeContext ctx);
Expand Down
Loading

0 comments on commit 6850198

Please sign in to comment.