Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format #3658

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
542d856
ADD: added modified JsonStringFormatter
Oct 22, 2024
51ddb26
EDIT: modified license style
Oct 22, 2024
52c808a
EDIT: changed test data for json column
Oct 22, 2024
1117342
EDIT: changed test data for json
Oct 22, 2024
bac5584
ROLLBACK: rollback TiDBConnectorITCase.java
Oct 22, 2024
bebeaa4
ADD: added useLegacyJsonFromat option (default true)
Nov 12, 2024
5050a8e
FIX: mvn spotless:apply
Nov 12, 2024
1ca8990
FIX: rollbacked white space for json string in test codes. becuase us…
Nov 13, 2024
29872a6
FIX: mvn spotless:apply
Nov 13, 2024
426d3e7
ROLLBACK: rollbacked somes test resources. because default value of u…
Nov 13, 2024
64a4a57
FIX: added whitespace
Nov 13, 2024
638cd21
ADD: useLegacyJsonFormat for legacy MySqlSource
Nov 13, 2024
9ed0778
ADD: added test codes, resources for useLegacyJsonType option
Nov 17, 2024
ffcf1f8
ADD: added test codes in MySqlMetadataAccessorITCase for json_types t…
Nov 17, 2024
396fab1
FIX: mvn spotless:apply
Nov 17, 2024
37b52cf
ADD: added USE_LEGACY_JSON_FORMAT for flink-cdc-pipeline-connector-mysql
Nov 17, 2024
401d403
UPDATE: added se.legacy.json.format option to mysql-cdc.md, pipeline/…
Nov 17, 2024
1265453
FIX: removed json_types table in legacy test code resources
Nov 17, 2024
337b5de
Merge branch 'master' into feature/added-whitespace-in-json-to-string…
SML0127 Nov 17, 2024
c46890d
ROLLBACK: recovered accidientally deleted testMysql57PrecisionTypesSc…
Nov 17, 2024
945f398
FIX: fixed wrong line number in annotation
Nov 28, 2024
5bf33c9
REFACTOR: changed codes for modifying expected data to be more clear
Nov 28, 2024
5dd1f51
EDIT: added annotations for modifying expected data
Nov 28, 2024
285280a
Merge branch 'master' into feature/added-whitespace-in-json-to-string…
SML0127 Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,18 @@ During a snapshot operation, the connector will query each included table to pro
hex: The binary data type is converted to a hexadecimal string and transmitted.
The default value is none. Depending on your requirements and data types, you can choose the appropriate processing mode. If your database contains a large number of binary data types, it is recommended to use base64 or hex mode to make it easier to handle during transmission.</td>
</tr>
<tr>
<td>use.legacy.json.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to use legacy JSON format to cast JSON type data in binlog. <br>
It determines whether to use the legacy JSON format when retrieving JSON type data in binlog.
If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example,
JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}.
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ pipeline:
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.
</td>
</tr>
<tr>
<td>use.legacy.json.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to use legacy JSON format to cast JSON type data in binlog. <br>
It determines whether to use the legacy JSON format when retrieving JSON type data in binlog.
If the user configures 'use.legacy.json.format' = 'true', whitespace before values and after commas in the JSON type data is removed. For example,
JSON type data {"key1": "value1", "key2": "value2"} in binlog would be converted to {"key1":"value1","key2":"value2"}.
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USE_LEGACY_JSON_FORMAT;
import static org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
Expand Down Expand Up @@ -139,6 +140,7 @@ public DataSource createDataSource(Context context) {
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean scanBinlogNewlyAddedTableEnabled =
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -175,7 +177,8 @@ public DataSource createDataSource(Context context) {
.includeSchemaChanges(includeSchemaChanges)
.debeziumProperties(getDebeziumProperties(configMap))
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.useLegacyJsonFormat(useLegacyJsonFormat);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

Expand Down Expand Up @@ -306,6 +309,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
options.add(USE_LEGACY_JSON_FORMAT);
options.add(METADATA_LIST);
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ public class MySqlDataSourceOptions {
+ "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; \n"
+ "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.");

@Experimental
public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT =
ConfigOptions.key("use.legacy.json.format")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
Comment on lines +276 to +282
Copy link
Contributor Author

@SML0127 SML0127 Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use.legacy.json.format option for pipeline connector. default is true.


@Experimental
public static final ConfigOption<String> METADATA_LIST =
ConfigOptions.key("metadata.list")
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests to verify if we have the same JSON format between snapshot and binlog stage?

Copy link
Contributor Author

@SML0127 SML0127 Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests for this new option. When useLegacyJsonFormat = flase, it verifies that the JSON type data format is the same between snapshot stage and binlog stage.

private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat)
throws Exception {
database.createAndInitialize();
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(
new String[] {"json_types"},
database,
useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
.executeAndCollect();
Object[] expectedSnapshot =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
BinaryStringData.fromString("{\"key1\": \"value1\"}"),
BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"),
BinaryStringData.fromString(
"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"),
1
};
// skip CreateTableEvent
List<Event> snapshotResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES))
.isEqualTo(expectedSnapshot);
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;");
}
Object[] expectedStreamRecord = expectedSnapshot;
if (useLegacyJsonFormat) {
expectedSnapshot[1] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
expectedSnapshot[2] =
BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}");
expectedSnapshot[3] =
BinaryStringData.fromString(
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]");
}
expectedSnapshot[4] = null;
List<Event> streamResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
.isEqualTo(expectedStreamRecord);
}

Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,39 @@ public void testMysql57CommonDataTypes() throws Throwable {
testCommonDataTypes(fullTypesMySql57Database);
}

@Test
public void testMysql57JsonDataTypes() throws Throwable {
// Set `useLegacyJsonFormat` as false, so the json string will have no whitespace
// before value and after comma in json format be formatted with legacy format.
testJsonDataType(fullTypesMySql57Database, false);
}

@Test
public void testMysql57JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {
// Set `useLegacyJsonFormat` as true, so the json string will have whitespace before
// value and after comma in json format be formatted with legacy format.
testJsonDataType(fullTypesMySql57Database, true);
}

@Test
public void testMySql8CommonDataTypes() throws Throwable {
testCommonDataTypes(fullTypesMySql8Database);
}

@Test
public void testMySql8JsonDataTypes() throws Throwable {
// Set `useLegacyJsonFormat` as false, so the json string will have no whitespace
// before value and after comma in json format be formatted with legacy format.
testJsonDataType(fullTypesMySql8Database, false);
}

@Test
public void testMySql8JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {
// Set `useLegacyJsonFormat` as true, so the json string will have whitespace before
// value and after comma in json format be formatted with legacy format.
testJsonDataType(fullTypesMySql8Database, true);
}

@Test
public void testMysql57TimeDataTypes() throws Throwable {
RowType recordType =
Expand Down Expand Up @@ -321,9 +349,13 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
};

database.createAndInitialize();
Boolean useLegacyJsonFormat = true;
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(new String[] {"precision_types"}, database)
getFlinkSourceProvider(
new String[] {"precision_types"},
database,
useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
Expand Down Expand Up @@ -351,9 +383,15 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {

private void testCommonDataTypes(UniqueDatabase database) throws Exception {
database.createAndInitialize();
// Set useLegacyJsonFormat option as true, so the json string will have no whitespace before
// value and after comma in json format.be formatted with legacy format.
Boolean useLegacyJsonFormat = true;
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(new String[] {"common_types"}, database)
getFlinkSourceProvider(
new String[] {"common_types"},
database,
useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
Expand Down Expand Up @@ -446,7 +484,7 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception {
}

expectedSnapshot[30] = null;
// The json string from binlog will remove useless space
// Legacy format removes useless space in json string from binlog
expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
Object[] expectedStreamRecord = expectedSnapshot;

Expand All @@ -457,6 +495,66 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception {
.isEqualTo(expectedStreamRecord);
}

private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat)
throws Exception {
database.createAndInitialize();
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(
new String[] {"json_types"},
database,
useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
.executeAndCollect();

Object[] expectedSnapshot =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
BinaryStringData.fromString("{\"key1\": \"value1\"}"),
BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"),
BinaryStringData.fromString(
"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"),
1
};

// skip CreateTableEvent
List<Event> snapshotResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES))
.isEqualTo(expectedSnapshot);

try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;");
}

Object[] expectedStreamRecord = expectedSnapshot;
List<Event> streamResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();

expectedSnapshot[4] = null;

if (useLegacyJsonFormat) {
// removed whitespace before value and after comma in json format string value
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
.containsExactly(
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
BinaryStringData.fromString("{\"key1\":\"value1\"}"),
BinaryStringData.fromString(
"{\"key1\":\"value1\",\"key2\":\"value2\"}"),
BinaryStringData.fromString(
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"),
null);
} else {
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
.containsExactly(expectedStreamRecord);
}
}

private Instant toInstant(String ts) {
return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC")).toInstant();
}
Expand All @@ -468,9 +566,13 @@ private void testTimeDataTypes(
Object[] expectedStreamRecord)
throws Exception {
database.createAndInitialize();
Boolean useLegacyJsonFormat = true;
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(new String[] {"time_types"}, database)
getFlinkSourceProvider(
new String[] {"time_types"},
database,
useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
Expand Down Expand Up @@ -498,7 +600,7 @@ private void testTimeDataTypes(
}

private FlinkSourceProvider getFlinkSourceProvider(
String[] captureTables, UniqueDatabase database) {
String[] captureTables, UniqueDatabase database, Boolean useLegacyJsonFormat) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> database.getDatabaseName() + "." + tableName)
Expand All @@ -517,7 +619,8 @@ private FlinkSourceProvider getFlinkSourceProvider(
.username(database.getUsername())
.password(database.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
.serverId(MySqSourceTestUtils.getServerId(env.getParallelism()));
.serverId(MySqSourceTestUtils.getServerId(env.getParallelism()))
.useLegacyJsonFormat(useLegacyJsonFormat);
return (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
}

Expand Down Expand Up @@ -577,4 +680,12 @@ private FlinkSourceProvider getFlinkSourceProvider(
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING());

private static final RowType JSON_TYPES =
RowType.of(
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT());
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,71 @@ public void testMysql8PrecisionTypesSchema() {
assertThat(actualSchema).isEqualTo(expectedSchema);
}

@Test
public void testMysql57AccessJsonTypesSchema() {
fullTypesMySql57Database.createAndInitialize();

String[] tables = new String[] {"json_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database);

Schema actualSchema =
metadataAccessor.getTableSchema(
TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types"));
Schema expectedSchema =
Schema.newBuilder()
.primaryKey("id")
.fromRowDataType(
RowType.of(
new DataType[] {
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT()
},
new String[] {
"id", "json_c0", "json_c1", "json_c2", "int_c",
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
}

@Test
public void testMysql8AccessJsonTypesSchema() {
fullTypesMySql57Database.createAndInitialize();

String[] tables = new String[] {"json_types"};
MySqlMetadataAccessor metadataAccessor =
getMetadataAccessor(tables, fullTypesMySql57Database);

Schema actualSchema =
metadataAccessor.getTableSchema(
TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "json_types"));
Schema expectedSchema =
Schema.newBuilder()
.primaryKey("id")
.fromRowDataType(
RowType.of(
new DataType[] {
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT()
},
new String[] {
"id", "json_c0", "json_c1", "json_c2", "int_c",
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
}

private void testAccessDatabaseAndTable(UniqueDatabase database) {
database.createAndInitialize();

String[] tables = new String[] {"common_types", "time_types", "precision_types"};
String[] tables =
new String[] {"common_types", "time_types", "precision_types", "json_types"};
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);

assertThatThrownBy(metadataAccessor::listNamespaces)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,22 @@ VALUES (DEFAULT,
23,
29,
31,
37);
37);

CREATE TABLE json_types
(
id SERIAL,
json_c0 JSON,
json_c1 JSON,
json_c2 JSON,
int_c INTEGER,
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

INSERT INTO json_types
VALUES (DEFAULT,
'{"key1":"value1"}',
'{"key1":"value1","key2":"value2"}',
'[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]',
1
Comment on lines +185 to +201
Copy link
Contributor Author

@SML0127 SML0127 Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test table for various JSON format data

);
Loading
Loading