iterator =
env.fromSource(
- getFlinkSourceProvider(new String[] {"time_types"}, database)
+ getFlinkSourceProvider(
+ new String[] {"time_types"},
+ database,
+ useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
@@ -498,7 +600,7 @@ private void testTimeDataTypes(
}
private FlinkSourceProvider getFlinkSourceProvider(
- String[] captureTables, UniqueDatabase database) {
+ String[] captureTables, UniqueDatabase database, Boolean useLegacyJsonFormat) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> database.getDatabaseName() + "." + tableName)
@@ -517,7 +619,8 @@ private FlinkSourceProvider getFlinkSourceProvider(
.username(database.getUsername())
.password(database.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
- .serverId(MySqSourceTestUtils.getServerId(env.getParallelism()));
+ .serverId(MySqSourceTestUtils.getServerId(env.getParallelism()))
+ .useLegacyJsonFormat(useLegacyJsonFormat);
return (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
}
@@ -577,4 +680,12 @@ private FlinkSourceProvider getFlinkSourceProvider(
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING());
+
+ private static final RowType JSON_TYPES =
+ RowType.of(
+ DataTypes.DECIMAL(20, 0).notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.INT());
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
index 94f1ed45dc5..0721ac83bec 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java
@@ -357,10 +357,71 @@ public void testMysql8PrecisionTypesSchema() {
assertThat(actualSchema).isEqualTo(expectedSchema);
}
+ @Test
+ public void testMysql57AccessJsonTypesSchema() {
+ fullTypesMySql57Database.createAndInitialize();
+
+ String[] tables = new String[] {"json_types"};
+ MySqlMetadataAccessor metadataAccessor =
+ getMetadataAccessor(tables, fullTypesMySql57Database);
+
+ Schema actualSchema =
+ metadataAccessor.getTableSchema(
+ TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types"));
+ Schema expectedSchema =
+ Schema.newBuilder()
+ .primaryKey("id")
+ .fromRowDataType(
+ RowType.of(
+ new DataType[] {
+ DataTypes.DECIMAL(20, 0).notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.INT()
+ },
+ new String[] {
+ "id", "json_c0", "json_c1", "json_c2", "int_c",
+ }))
+ .build();
+ assertThat(actualSchema).isEqualTo(expectedSchema);
+ }
+
+ @Test
+ public void testMysql8AccessJsonTypesSchema() {
+ fullTypesMySql57Database.createAndInitialize();
+
+ String[] tables = new String[] {"json_types"};
+ MySqlMetadataAccessor metadataAccessor =
+ getMetadataAccessor(tables, fullTypesMySql57Database);
+
+ Schema actualSchema =
+ metadataAccessor.getTableSchema(
+ TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types"));
+ Schema expectedSchema =
+ Schema.newBuilder()
+ .primaryKey("id")
+ .fromRowDataType(
+ RowType.of(
+ new DataType[] {
+ DataTypes.DECIMAL(20, 0).notNull(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.INT()
+ },
+ new String[] {
+ "id", "json_c0", "json_c1", "json_c2", "int_c",
+ }))
+ .build();
+ assertThat(actualSchema).isEqualTo(expectedSchema);
+ }
+
private void testAccessDatabaseAndTable(UniqueDatabase database) {
database.createAndInitialize();
- String[] tables = new String[] {"common_types", "time_types", "precision_types"};
+ String[] tables =
+ new String[] {"common_types", "time_types", "precision_types", "json_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
assertThatThrownBy(metadataAccessor::listNamespaces)
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
index 9699ed908c2..e7dc07e2897 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql
@@ -180,4 +180,22 @@ VALUES (DEFAULT,
23,
29,
31,
- 37);
\ No newline at end of file
+ 37);
+
+CREATE TABLE json_types
+(
+ id SERIAL,
+ json_c0 JSON,
+ json_c1 JSON,
+ json_c2 JSON,
+ int_c INTEGER,
+ PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO json_types
+VALUES (DEFAULT,
+ '{"key1":"value1"}',
+ '{"key1":"value1","key2":"value2"}',
+ '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]',
+ 1
+ );
\ No newline at end of file
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
index 8abe8868c07..92548381f62 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql
@@ -185,3 +185,21 @@ VALUES (DEFAULT,
29,
31,
37);
+
+CREATE TABLE json_types
+(
+ id SERIAL,
+ json_c0 JSON,
+ json_c1 JSON,
+ json_c2 JSON,
+ int_c INTEGER,
+ PRIMARY KEY (id)
+) DEFAULT CHARSET=utf8;
+
+INSERT INTO json_types
+VALUES (DEFAULT,
+ '{"key1":"value1"}',
+ '{"key1":"value1","key2":"value2"}',
+ '[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]',
+ 1
+ );
\ No newline at end of file
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java
new file mode 100644
index 00000000000..7154496c071
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/json/JsonStringFormatter.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.shyiko.mysql.binlog.event.deserialization.json;
+
+import com.github.shyiko.mysql.binlog.event.deserialization.ColumnType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Base64;
+
+import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig.useLegacyJsonFormat;
+
+/**
+ * Copied from mysql-binlog-connector-java 0.27.2 to add whitespace before value and after comma.
+ *
+ * Line 105: Added whitespace before value, Line 213: Added whitespace after comma
+ */
+public class JsonStringFormatter implements JsonFormatter {
+
+ /**
+ * Value used for lookup tables to indicate that matching characters do not need to be escaped.
+ */
+ private static final int ESCAPE_NONE = 0;
+
+ /**
+ * Value used for lookup tables to indicate that matching characters are to be escaped using
+ * standard escaping; for JSON this means (for example) using "backslash - u" escape method.
+ */
+ private static final int ESCAPE_GENERIC = -1;
+
+ /**
+ * A lookup table that determines which of the first 128 Unicode code points (single-byte UTF-8
+ * characters) must be escaped. A value of '0' means no escaping is required; positive values
+ * must be escaped with a preceding backslash; and negative values that generic escaping (e.g.,
+ */
+ private static final int[] ESCAPES;
+
+ static {
+ int[] escape = new int[128];
+ // Generic escape for control characters ...
+ for (int i = 0; i < 32; ++i) {
+ escape[i] = ESCAPE_GENERIC;
+ }
+ // Backslash escape for other specific characters ...
+ escape['"'] = '"';
+ escape['\\'] = '\\';
+ // Escaping of slash is optional, so let's not add it
+ escape[0x08] = 'b';
+ escape[0x09] = 't';
+ escape[0x0C] = 'f';
+ escape[0x0A] = 'n';
+ escape[0x0D] = 'r';
+ ESCAPES = escape;
+ }
+
+ private static final char[] HEX_CODES = "0123456789ABCDEF".toCharArray();
+
+ private final StringBuilder sb = new StringBuilder();
+
+ @Override
+ public String toString() {
+ return getString();
+ }
+
+ public String getString() {
+ return sb.toString();
+ }
+
+ @Override
+ public void beginObject(int numElements) {
+ sb.append('{');
+ }
+
+ @Override
+ public void beginArray(int numElements) {
+ sb.append('[');
+ }
+
+ @Override
+ public void endObject() {
+ sb.append('}');
+ }
+
+ @Override
+ public void endArray() {
+ sb.append(']');
+ }
+
+ @Override
+ public void name(String name) {
+ sb.append('"');
+ appendString(name);
+ if (useLegacyJsonFormat) {
+ sb.append("\":");
+ } else {
+ sb.append("\": ");
+ }
+ }
+
+ @Override
+ public void value(String value) {
+ sb.append('"');
+ appendString(value);
+ sb.append('"');
+ }
+
+ @Override
+ public void value(int value) {
+ sb.append(Integer.toString(value));
+ }
+
+ @Override
+ public void value(long value) {
+ sb.append(Long.toString(value));
+ }
+
+ @Override
+ public void value(double value) {
+ // Double's toString method will result in scientific notation and loss of precision
+ String str = Double.toString(value);
+ if (str.contains("E")) {
+ value(new BigDecimal(value));
+ } else {
+ sb.append(str);
+ }
+ }
+
+ @Override
+ public void value(BigInteger value) {
+ // Using the BigInteger.toString() method will result in scientific notation, so instead ...
+ value(new BigDecimal(value));
+ }
+
+ @Override
+ public void value(BigDecimal value) {
+ // Using the BigInteger.toString() method will result in scientific notation, so instead ...
+ sb.append(value.toPlainString());
+ }
+
+ @Override
+ public void value(boolean value) {
+ sb.append(Boolean.toString(value));
+ }
+
+ @Override
+ public void valueNull() {
+ sb.append("null");
+ }
+
+ @Override
+ public void valueYear(int year) {
+ sb.append(year);
+ }
+
+ @Override
+ public void valueDate(int year, int month, int day) {
+ sb.append('"');
+ appendDate(year, month, day);
+ sb.append('"');
+ }
+
+ @Override
+ // checkstyle, please ignore ParameterNumber for the next line
+ public void valueDatetime(
+ int year, int month, int day, int hour, int min, int sec, int microSeconds) {
+ sb.append('"');
+ appendDate(year, month, day);
+ sb.append(' ');
+ appendTime(hour, min, sec, microSeconds);
+ sb.append('"');
+ }
+
+ @Override
+ public void valueTime(int hour, int min, int sec, int microSeconds) {
+ sb.append('"');
+ if (hour < 0) {
+ sb.append('-');
+ hour = Math.abs(hour);
+ }
+ appendTime(hour, min, sec, microSeconds);
+ sb.append('"');
+ }
+
+ @Override
+ public void valueTimestamp(long secondsPastEpoch, int microSeconds) {
+ sb.append(secondsPastEpoch);
+ appendSixDigitUnsignedInt(microSeconds, false);
+ }
+
+ @Override
+ public void valueOpaque(ColumnType type, byte[] value) {
+ sb.append('"');
+ sb.append(Base64.getEncoder().encodeToString(value));
+ sb.append('"');
+ }
+
+ @Override
+ public void nextEntry() {
+ if (useLegacyJsonFormat) {
+ sb.append(",");
+ } else {
+ sb.append(", ");
+ }
+ }
+
+ /**
+ * Append a string by escaping any characters that must be escaped.
+ *
+ * @param original the string to be written; may not be null
+ */
+ protected void appendString(String original) {
+ for (int i = 0, len = original.length(); i < len; ++i) {
+ char c = original.charAt(i);
+ int ch = c;
+ if (ch < 0 || ch >= ESCAPES.length || ESCAPES[ch] == 0) {
+ sb.append(c);
+ continue;
+ }
+ int escape = ESCAPES[ch];
+ if (escape > 0) { // 2-char escape, fine
+ sb.append('\\');
+ sb.append((char) escape);
+ } else {
+ unicodeEscape(ch);
+ }
+ }
+ }
+
+ /**
+ * Append a generic Unicode escape for given character.
+ *
+ * @param charToEscape the character to escape
+ */
+ private void unicodeEscape(int charToEscape) {
+ sb.append('\\');
+ sb.append('u');
+ if (charToEscape > 0xFF) {
+ int hi = (charToEscape >> 8) & 0xFF;
+ sb.append(HEX_CODES[hi >> 4]);
+ sb.append(HEX_CODES[hi & 0xF]);
+ charToEscape &= 0xFF;
+ } else {
+ sb.append('0');
+ sb.append('0');
+ }
+ // We know it's a control char, so only the last 2 chars are non-0
+ sb.append(HEX_CODES[charToEscape >> 4]);
+ sb.append(HEX_CODES[charToEscape & 0xF]);
+ }
+
+ protected void appendTwoDigitUnsignedInt(int value) {
+ assert value >= 0;
+ assert value < 100;
+ if (value < 10) {
+ sb.append("0").append(value);
+ } else {
+ sb.append(value);
+ }
+ }
+
+ protected void appendFourDigitUnsignedInt(int value) {
+ if (value < 10) {
+ sb.append("000").append(value);
+ } else if (value < 100) {
+ sb.append("00").append(value);
+ } else if (value < 1000) {
+ sb.append("0").append(value);
+ } else {
+ sb.append(value);
+ }
+ }
+
+ protected void appendSixDigitUnsignedInt(int value, boolean trimTrailingZeros) {
+ assert value > 0;
+ assert value < 1000000;
+ // Add prefixes if necessary ...
+ if (value < 10) {
+ sb.append("00000");
+ } else if (value < 100) {
+ sb.append("0000");
+ } else if (value < 1000) {
+ sb.append("000");
+ } else if (value < 10000) {
+ sb.append("00");
+ } else if (value < 100000) {
+ sb.append("0");
+ }
+ if (trimTrailingZeros) {
+ // Remove any trailing 0's ...
+ for (int i = 0; i != 6; ++i) {
+ if (value % 10 == 0) {
+ value /= 10;
+ }
+ }
+ sb.append(value);
+ }
+ }
+
+ protected void appendDate(int year, int month, int day) {
+ if (year < 0) {
+ sb.append('-');
+ year = Math.abs(year);
+ }
+ appendFourDigitUnsignedInt(year);
+ sb.append('-');
+ appendTwoDigitUnsignedInt(month);
+ sb.append('-');
+ appendTwoDigitUnsignedInt(day);
+ }
+
+ protected void appendTime(int hour, int min, int sec, int microSeconds) {
+ appendTwoDigitUnsignedInt(hour);
+ sb.append(':');
+ appendTwoDigitUnsignedInt(min);
+ sb.append(':');
+ appendTwoDigitUnsignedInt(sec);
+ if (microSeconds != 0) {
+ sb.append('.');
+ appendSixDigitUnsignedInt(microSeconds, true);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java
index 49187838f74..fb512a1eb9b 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java
@@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.mysql;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
@@ -157,6 +158,11 @@ public Builder startupOptions(StartupOptions startupOptions) {
return this;
}
+ public Builder useLegacyJsonFormat(boolean useLegacyJsonFormat) {
+ MySqlSourceConfig.useLegacyJsonFormat = useLegacyJsonFormat;
+ return this;
+ }
+
public DebeziumSourceFunction build() {
Properties props = new Properties();
props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
index c03aa36b056..065f1e439d7 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
@@ -253,6 +253,15 @@ public MySqlSourceBuilder skipSnapshotBackfill(boolean skipSnapshotBackfill)
return this;
}
+ /**
+ * Whether to use legacy json format. The default value is true, which means there is no
+ * whitespace before value and after comma in json format.
+ */
+ public MySqlSourceBuilder useLegacyJsonFormat(boolean useLegacyJsonFormat) {
+ this.configFactory.useLegacyJsonFormat(useLegacyJsonFormat);
+ return this;
+ }
+
/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index dd0ac789666..149a585f0ae 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -66,6 +66,7 @@ public class MySqlSourceConfig implements Serializable {
private final Properties jdbcProperties;
private final Map chunkKeyColumns;
private final boolean skipSnapshotBackfill;
+ public static boolean useLegacyJsonFormat = true;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@@ -99,7 +100,8 @@ public class MySqlSourceConfig implements Serializable {
Properties dbzProperties,
Properties jdbcProperties,
Map chunkKeyColumns,
- boolean skipSnapshotBackfill) {
+ boolean skipSnapshotBackfill,
+ boolean useLegacyJsonFormat) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@@ -127,6 +129,7 @@ public class MySqlSourceConfig implements Serializable {
this.jdbcProperties = jdbcProperties;
this.chunkKeyColumns = chunkKeyColumns;
this.skipSnapshotBackfill = skipSnapshotBackfill;
+ this.useLegacyJsonFormat = useLegacyJsonFormat;
}
public String getHostname() {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index 8b65055ca13..27f1243b898 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -70,6 +70,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private Properties dbzProperties;
private Map chunkKeyColumns = new HashMap<>();
private boolean skipSnapshotBackfill = false;
+ private boolean useLegacyJsonFormat = true;
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
@@ -276,6 +277,15 @@ public MySqlSourceConfigFactory skipSnapshotBackfill(boolean skipSnapshotBackfil
return this;
}
+ /**
+ * Whether to use legacy json format. The default value is true, which means there is no
+ * whitespace before value and after comma in json format.
+ */
+ public MySqlSourceConfigFactory useLegacyJsonFormat(boolean useLegacyJsonFormat) {
+ this.useLegacyJsonFormat = useLegacyJsonFormat;
+ return this;
+ }
+
/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
@@ -384,6 +394,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
props,
jdbcProperties,
chunkKeyColumns,
- skipSnapshotBackfill);
+ skipSnapshotBackfill,
+ useLegacyJsonFormat);
}
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index f3424c8dfb9..bf1a301da93 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -262,4 +262,12 @@ public class MySqlSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in binlog reading phase instead of being merged into the snapshot. WARNING: Skipping backfill might lead to data inconsistency because some binlog events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed binlog events should be handled specially.");
+
+ @Experimental
+ public static final ConfigOption USE_LEGACY_JSON_FORMAT =
+ ConfigOptions.key("use.legacy.json.format")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index 242e02da634..a2865f82696 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -98,6 +98,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final Duration heartbeatInterval;
private final String chunkKeyColumn;
final boolean skipSnapshotBackFill;
+ private final boolean useLegacyJsonFormat;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@@ -135,7 +136,8 @@ public MySqlTableSource(
Properties jdbcProperties,
Duration heartbeatInterval,
@Nullable String chunkKeyColumn,
- boolean skipSnapshotBackFill) {
+ boolean skipSnapshotBackFill,
+ boolean useLegacyJsonFormat) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -165,6 +167,7 @@ public MySqlTableSource(
this.heartbeatInterval = heartbeatInterval;
this.chunkKeyColumn = chunkKeyColumn;
this.skipSnapshotBackFill = skipSnapshotBackFill;
+ this.useLegacyJsonFormat = useLegacyJsonFormat;
}
@Override
@@ -220,6 +223,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.heartbeatInterval(heartbeatInterval)
.chunkKeyColumn(new ObjectPath(database, tableName), chunkKeyColumn)
.skipSnapshotBackfill(skipSnapshotBackFill)
+ .useLegacyJsonFormat(useLegacyJsonFormat)
.build();
return SourceProvider.of(parallelSource);
} else {
@@ -305,7 +309,8 @@ public DynamicTableSource copy() {
jdbcProperties,
heartbeatInterval,
chunkKeyColumn,
- skipSnapshotBackFill);
+ skipSnapshotBackFill,
+ useLegacyJsonFormat);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index e435f946a7e..467bbaecb9e 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -102,6 +102,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackFill =
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
+ boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
@@ -145,7 +146,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
heartbeatInterval,
chunkKeyColumn,
- skipSnapshotBackFill);
+ skipSnapshotBackFill,
+ useLegacyJsonFormat);
}
@Override
@@ -191,6 +193,7 @@ public Set> optionalOptions() {
options.add(MySqlSourceOptions.HEARTBEAT_INTERVAL);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
+ options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
return options;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index 8339a1db544..44829dc415a 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -55,6 +55,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.USE_LEGACY_JSON_FORMAT;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -127,7 +128,8 @@ public void testCommonProperties() {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
- SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -173,7 +175,8 @@ public void testEnableParallelReadSource() {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
"testCol",
- SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -215,7 +218,8 @@ public void testEnableParallelReadSourceWithSingleServerId() {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
- SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -255,7 +259,8 @@ public void testEnableParallelReadSourceLatestOffset() {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
- SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -275,6 +280,7 @@ public void testOptionalProperties() {
options.put("scan.incremental.snapshot.chunk.key-column", "testCol");
options.put("scan.incremental.close-idle-reader.enabled", "true");
options.put("scan.incremental.snapshot.backfill.skip", "true");
+ options.put("use.legacy.json.format", "true");
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
@@ -311,6 +317,7 @@ public void testOptionalProperties() {
jdbcProperties,
Duration.ofMillis(15213),
"testCol",
+ true,
true);
assertEquals(expectedSource, actualSource);
assertTrue(actualSource instanceof MySqlTableSource);
@@ -365,7 +372,8 @@ public void testStartupFromSpecificOffset() {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
- SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -403,7 +411,8 @@ public void testStartupFromInitial() {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
- SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -442,7 +451,8 @@ public void testStartupFromEarliestOffset() {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
- SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -482,7 +492,8 @@ public void testStartupFromSpecificTimestamp() {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
- SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -520,7 +531,8 @@ public void testStartupFromLatestOffset() {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
- SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ USE_LEGACY_JSON_FORMAT.defaultValue());
assertEquals(expectedSource, actualSource);
}
@@ -563,7 +575,8 @@ public void testMetadataColumns() {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
- SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
+ SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
+ USE_LEGACY_JSON_FORMAT.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql
index e9145a7e81f..b75fd69f07f 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test.sql
@@ -111,4 +111,4 @@ CREATE TABLE user_info
balance DECIMAL(18, 2),
balance2 DECIMAL(18, 2),
PRIMARY KEY (user_id)
-) DEFAULT CHARSET=utf8;
\ No newline at end of file
+) DEFAULT CHARSET=utf8;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql
index 0a202f1ca8f..842cfcc51d1 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/column_type_test_mysql8.sql
@@ -100,4 +100,4 @@ VALUES (DEFAULT, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215
ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),
ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),
ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'),
- ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))'));
\ No newline at end of file
+ ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))'));