Skip to content

Commit

Permalink
[BugFix] [CDC] Fix the problem of 1.18 CDC Plus introduced DORIS (#3162)
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh authored Feb 19, 2024
1 parent 5cfe742 commit a62813c
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ protected Properties getProperties() {
}

protected SingleOutputStreamOperator<Map> deserialize(DataStreamSource<String> dataStreamSource) {
return dataStreamSource.map((MapFunction<String, Map>) value -> objectMapper.readValue(value, Map.class));
return dataStreamSource
.map((MapFunction<String, Map>) value -> objectMapper.readValue(value, Map.class))
.returns(Map.class);
}

protected SingleOutputStreamOperator<Map> shunt(
Expand All @@ -156,7 +158,9 @@ protected DataStream<RowData> buildRowData(
List<String> columnNameList,
List<LogicalType> columnTypeList,
String schemaTableName) {
return filterOperator.flatMap(sinkRowDataFunction(columnNameList, columnTypeList, schemaTableName));
return filterOperator
.flatMap(sinkRowDataFunction(columnNameList, columnTypeList, schemaTableName))
.returns(RowData.class);
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,22 @@

package org.dinky.cdc.doris;

import org.dinky.assertion.Asserts;
import org.dinky.cdc.AbstractSinkBuilder;
import org.dinky.cdc.SinkBuilder;
import org.dinky.data.model.FlinkCDCConfig;
import org.dinky.data.model.Table;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.RowDataSerializer;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.dinky.assertion.Asserts;
import org.dinky.cdc.AbstractSinkBuilder;
import org.dinky.cdc.SinkBuilder;
import org.dinky.data.model.FlinkCDCConfig;
import org.dinky.data.model.Table;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -185,7 +183,7 @@ public void addSink(
DorisSink.Builder<RowData> builder = DorisSink.builder();
builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer((DorisRecordSerializer<RowData>) RowDataSerializer.builder()
.setSerializer( RowDataSerializer.builder()
.setFieldNames(columnNames)
.setType("json")
.enableDelete(true)
Expand Down
37 changes: 37 additions & 0 deletions dinky-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -344,5 +344,42 @@
</dependency>
</dependencies>
</profile>

<profile>
<id>flink-1.15</id>
<dependencies>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-cdc-plus</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
<id>flink-1.16</id>
<dependencies>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-cdc-plus</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
<id>flink-1.17</id>
<dependencies>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-cdc-plus</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
<id>flink-1.18</id>
<dependencies>
<dependency>
<groupId>org.dinky</groupId>
<artifactId>dinky-cdc-plus</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
4 changes: 2 additions & 2 deletions dinky-flink/dinky-flink-1.18/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>1.3.0</version>
<artifactId>flink-doris-connector-1.18</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down

0 comments on commit a62813c

Please sign in to comment.