Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format #3658

Open
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

SML0127
Copy link
Contributor

@SML0127 SML0127 commented Oct 22, 2024

The function to convert json to string is different in snapshot step and binlog step.
This causes the following differences.

original data : {"key": "value", "key1": "value1"}
snapshot : {"key": "value", "key1": "value1"}
binlog : {"key":"value","key1":"value1"} // no whitespace

It is already known issue, but I think it is necessary to process json in the same way in snapshot step and binlog step.
So I modified and added JsonStringFormatter to flink-cdc to handle json in the same way as the result of snapshot step.

  • modified code line
    • line 105: Added whitespace before value
    • line 207: Added whitespace after comma

jira issue: https://issues.apache.org/jira/browse/FLINK-36578

@SML0127
Copy link
Contributor Author

SML0127 commented Oct 22, 2024

@SML0127 SML0127 changed the title [FLINK-36578][mysql] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter [FLINK-36578][mysql][BUG] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter Oct 22, 2024
Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @SML0127's work! Just left some minor comments.

Comment on lines 414 to 415
// The json string from binlog will remove useless space
expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\": \"value1\"}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This additional assignment was meant to fix the format discrepancy and could be removed now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this enhancement an optional configuration and defaults to the legacy format, considering some users may have relied on this specific JSON format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added useLegacyJsonFormat option as a static variable in MySqlSourceConfig, and shared it with legcay MySqlSource for legacy MySqlSource user.

Also the default value of this option is true, so users who want to continue using current format(legacy format) don't need to worry about this new option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests to verify if we have the same JSON format between snapshot and binlog stage?

Copy link
Contributor Author

@SML0127 SML0127 Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests for this new option. When useLegacyJsonFormat = flase, it verifies that the JSON type data format is the same between snapshot stage and binlog stage.

private void testJsonDataType(UniqueDatabase database, Boolean useLegacyJsonFormat)
throws Exception {
database.createAndInitialize();
CloseableIterator<Event> iterator =
env.fromSource(
getFlinkSourceProvider(
new String[] {"json_types"},
database,
useLegacyJsonFormat)
.getSource(),
WatermarkStrategy.noWatermarks(),
"Event-Source")
.executeAndCollect();
Object[] expectedSnapshot =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
BinaryStringData.fromString("{\"key1\": \"value1\"}"),
BinaryStringData.fromString("{\"key1\": \"value1\", \"key2\": \"value2\"}"),
BinaryStringData.fromString(
"[{\"key1\": \"value1\", \"key2\": {\"key2_1\": \"value2_1\", \"key2_2\": \"value2_2\"}, \"key3\": [\"value3\"], \"key4\": [\"value4_1\", \"value4_2\"]}, {\"key5\": \"value5\"}]"),
1
};
// skip CreateTableEvent
List<Event> snapshotResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, JSON_TYPES))
.isEqualTo(expectedSnapshot);
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE json_types SET int_c = null WHERE id = 1;");
}
Object[] expectedStreamRecord = expectedSnapshot;
if (useLegacyJsonFormat) {
expectedSnapshot[1] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
expectedSnapshot[2] =
BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}");
expectedSnapshot[3] =
BinaryStringData.fromString(
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]");
}
expectedSnapshot[4] = null;
List<Event> streamResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
.isEqualTo(expectedStreamRecord);
}

@github-actions github-actions bot added the docs Improvements or additions to documentation label Nov 17, 2024
@SML0127 SML0127 changed the title [FLINK-36578][mysql][BUG] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter [FLINK-36578][mysql] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter Nov 17, 2024
@SML0127 SML0127 changed the title [FLINK-36578][mysql] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter [FLINK-36578][MySQL] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter Nov 17, 2024
Copy link
Contributor Author

@SML0127 SML0127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +108 to +112
if (useLegacyJsonFormat) {
sb.append("\":");
} else {
sb.append("\": ");
}
Copy link
Contributor Author

@SML0127 SML0127 Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we can choose JSON type data format (whitespace in before value)

Comment on lines +214 to +218
if (useLegacyJsonFormat) {
sb.append(",");
} else {
sb.append(", ");
}
Copy link
Contributor Author

@SML0127 SML0127 Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we can choose JSON type data format (whitespace in after comma)

Comment on lines +185 to +201
(
id SERIAL,
json_c0 JSON,
json_c1 JSON,
json_c2 JSON,
int_c INTEGER,
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

INSERT INTO json_types
VALUES (DEFAULT,
'{"key1":"value1"}',
'{"key1":"value1","key2":"value2"}',
'[{"key1":"value1","key2":{"key2_1":"value2_1","key2_2":"value2_2"},"key3":["value3"],"key4":["value4_1","value4_2"]},{"key5":"value5"}]',
1
);
Copy link
Contributor Author

@SML0127 SML0127 Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test table for various JSON format data

Comment on lines +284 to +288
public MySqlSourceConfigFactory useLegacyJsonFormat(boolean useLegacyJsonFormat) {
this.useLegacyJsonFormat = useLegacyJsonFormat;
return this;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setter for useLegacyJsonFormat option

Comment on lines +276 to +282
@Experimental
public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT =
ConfigOptions.key("use.legacy.json.format")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
Copy link
Contributor Author

@SML0127 SML0127 Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use.legacy.json.format option for pipeline connector. default is true.

Comment on lines +266 to +272
@Experimental
public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT =
ConfigOptions.key("use.legacy.json.format")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
Copy link
Contributor Author

@SML0127 SML0127 Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use.legacy.json.format option for source connector. default is true.

@SML0127 SML0127 changed the title [FLINK-36578][MySQL] Add table information of binlog offsets when checkpointing added modified JsonStringFormatter [FLINK-36578][MySQL] Added modified JsonStringFormatter and option use.legacy.json.format Nov 22, 2024
@SML0127 SML0127 requested a review from yuxiqian November 27, 2024 08:42
Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @SML0127's nice work, looks good overall (I left some suggestions on test cases). Still need hearing from @ruanhang1993 on the changed configs.

/**
* Copied from mysql-binlog-connector-java 0.27.2 to add whitespace before value and after comma.
*
* <p>Line 105: Added whitespace before value, Line 207: Added whitespace after comma
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems line numbers have been differed from actual changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidental change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was my mistake...

Comment on lines 536 to 544
if (useLegacyJsonFormat) {
expectedSnapshot[1] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
expectedSnapshot[2] =
BinaryStringData.fromString("{\"key1\":\"value1\",\"key2\":\"value2\"}");
expectedSnapshot[3] =
BinaryStringData.fromString(
"[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]");
}
expectedSnapshot[4] = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying expected data is a little confusing. Will something like

if (useLegacyJsonFormat) {
    assertThat(RecordDataTestUtils.recordFields(...))
        .containsExactly(
            BinaryStringData.fromString("{\"key1\":\"value1\"}"),
            BinaryStringData.fromString("{\"key2\":\"value2\"}"),
            ...
        );
} else {
    assertThat(RecordDataTestUtils.recordFields(...))
        .containsExactly(...);
}

be clearer?

Copy link
Contributor Author

@SML0127 SML0127 Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I revised it more clearly as shown below.

if (useLegacyJsonFormat) {
    // removed whitespace before value and after comma in json format string value
    Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
            .containsExactly(
                    DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
                    BinaryStringData.fromString("{\"key1\":\"value1\"}"),
                    BinaryStringData.fromString(
                            "{\"key1\":\"value1\",\"key2\":\"value2\"}"),
                    BinaryStringData.fromString(
                            "[{\"key1\":\"value1\",\"key2\":{\"key2_1\":\"value2_1\",\"key2_2\":\"value2_2\"},\"key3\":[\"value3\"],\"key4\":[\"value4_1\",\"value4_2\"]},{\"key5\":\"value5\"}]"),
                    null);
} else {
    Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, JSON_TYPES))
            .containsExactly(expectedStreamRecord);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since DataStream / Table source connectors' behavior also changed, corresponding integrated test cases might be necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add integrated test cases if necessary.

@SML0127
Copy link
Contributor Author

SML0127 commented Jan 3, 2025

@ruanhang1993 @leonardBang PTAL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants