From addeb1d1a0107ab1e108ab03fe89bd5acdc8385c Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sat, 17 Aug 2024 21:07:00 +0800 Subject: [PATCH] Refactor MySQLBaseRowsBinlogEvent (#32566) * Refactor MySQLBaseRowsBinlogEvent * Refactor MySQLBaseRowsBinlogEvent --- .../event/rows/MySQLBaseRowsBinlogEvent.java | 8 ++-- .../rows/MySQLDeleteRowsBinlogEvent.java | 7 +++- .../rows/MySQLUpdateRowsBinlogEvent.java | 8 +++- .../event/rows/MySQLWriteRowsBinlogEvent.java | 7 +++- .../netty/MySQLBinlogEventPacketDecoder.java | 25 +++++++------ .../dumper/MySQLIncrementalDumperTest.java | 37 +++---------------- 6 files changed, 38 insertions(+), 54 deletions(-) diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java index 8bb91ffd8332d..0c9d39784beba 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java @@ -18,17 +18,17 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; -import lombok.Setter; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; /** * MySQL rows base event. */ +@RequiredArgsConstructor @Getter -@Setter public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent { - private String databaseName; + private final String databaseName; - private String tableName; + private final String tableName; } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java index e403d6d643c27..50e3d14d6b49f 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; -import lombok.RequiredArgsConstructor; import java.io.Serializable; import java.util.List; @@ -26,9 +25,13 @@ /** * MySQL delete rows binlog event. */ -@RequiredArgsConstructor @Getter public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { private final List beforeRows; + + public MySQLDeleteRowsBinlogEvent(final String databaseName, final String tableName, final List beforeRows) { + super(databaseName, tableName); + this.beforeRows = beforeRows; + } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java index e8b993623685b..8c3379ea7860d 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; -import lombok.RequiredArgsConstructor; import java.io.Serializable; import java.util.List; @@ -26,11 +25,16 @@ /** * MySQL update rows binlog event. */ -@RequiredArgsConstructor @Getter public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { private final List beforeRows; private final List afterRows; + + public MySQLUpdateRowsBinlogEvent(final String databaseName, final String tableName, final List beforeRows, final List afterRows) { + super(databaseName, tableName); + this.beforeRows = beforeRows; + this.afterRows = afterRows; + } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java index 912d54e14d1a1..db1029bfac079 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; -import lombok.RequiredArgsConstructor; import java.io.Serializable; import java.util.List; @@ -26,9 +25,13 @@ /** * MySQL write rows binlog event. */ -@RequiredArgsConstructor @Getter public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { private final List afterRows; + + public MySQLWriteRowsBinlogEvent(final String databaseName, final String tableName, final List afterRows) { + super(databaseName, tableName); + this.afterRows = afterRows; + } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java index 223c7ed08ea00..25f7919687c30 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java @@ -199,31 +199,32 @@ private void decodeTableMapEvent(final MySQLBinlogEventHeader binlogEventHeader, private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); - packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload); - MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(packet.getRows()); - initRowsEvent(result, binlogEventHeader, packet.getTableId()); + MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId()); + packet.readRows(tableMapEventPacket, payload); + MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows()); + initRowsEvent(result, binlogEventHeader); return result; } private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); - packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload); - MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(packet.getRows(), packet.getRows2()); - initRowsEvent(result, binlogEventHeader, packet.getTableId()); + MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId()); + packet.readRows(tableMapEventPacket, payload); + MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2()); + initRowsEvent(result, binlogEventHeader); return result; } private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); - packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload); - MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(packet.getRows()); - initRowsEvent(result, binlogEventHeader, packet.getTableId()); + MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId()); + packet.readRows(tableMapEventPacket, payload); + MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows()); + initRowsEvent(result, binlogEventHeader); return result; } - private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final MySQLBinlogEventHeader binlogEventHeader, final long tableId) { - rowsEvent.setDatabaseName(binlogContext.getTableMapEvent(tableId).getSchemaName()); - rowsEvent.setTableName(binlogContext.getTableMapEvent(tableId).getTableName()); + private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final MySQLBinlogEventHeader binlogEventHeader) { rowsEvent.setFileName(binlogContext.getFileName()); rowsEvent.setPosition(binlogEventHeader.getLogPos()); rowsEvent.setTimestamp(binlogEventHeader.getTimestamp()); diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java index cae3e3bb80c33..e7926fe549934 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java @@ -134,20 +134,13 @@ private List mockOrderColumnsMetaDataList() { @Test void assertWriteRowsEvent() throws ReflectiveOperationException { - List actual = getRecordsByWriteRowsEvent(createWriteRowsEvent()); + List actual = getRecordsByWriteRowsEvent(new MySQLWriteRowsBinlogEvent("", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}))); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.INSERT)); assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3)); } - private MySQLWriteRowsBinlogEvent createWriteRowsEvent() { - MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, "OK"})); - result.setDatabaseName(""); - result.setTableName("t_order"); - return result; - } - private List getRecordsByWriteRowsEvent(final MySQLWriteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException { Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", MySQLWriteRowsBinlogEvent.class, PipelineTableMetaData.class); return (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); @@ -155,20 +148,14 @@ private List getRecordsByWriteRowsEvent(final MySQLWriteRowsBinlogEvent @Test void assertUpdateRowsEvent() throws ReflectiveOperationException { - List actual = getRecordsByUpdateRowsEvent(createUpdateRowsEvent()); + List actual = getRecordsByUpdateRowsEvent( + new MySQLUpdateRowsBinlogEvent("test", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"}))); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.UPDATE)); assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3)); } - private MySQLUpdateRowsBinlogEvent createUpdateRowsEvent() { - MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, "OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"})); - result.setDatabaseName("test"); - result.setTableName("t_order"); - return result; - } - private List getRecordsByUpdateRowsEvent(final MySQLUpdateRowsBinlogEvent rowsEvent) throws ReflectiveOperationException { Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", MySQLUpdateRowsBinlogEvent.class, PipelineTableMetaData.class); return (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); @@ -176,20 +163,13 @@ private List getRecordsByUpdateRowsEvent(final MySQLUpdateRowsBinlogEven @Test void assertDeleteRowsEvent() throws ReflectiveOperationException { - List actual = getRecordsByDeleteRowsEvent(createDeleteRowsEvent()); + List actual = getRecordsByDeleteRowsEvent(new MySQLDeleteRowsBinlogEvent("", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}))); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.DELETE)); assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3)); } - private MySQLDeleteRowsBinlogEvent createDeleteRowsEvent() { - MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, "OK"})); - result.setDatabaseName(""); - result.setTableName("t_order"); - return result; - } - private List getRecordsByDeleteRowsEvent(final MySQLDeleteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException { Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", MySQLDeleteRowsBinlogEvent.class, PipelineTableMetaData.class); return (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); @@ -205,15 +185,8 @@ void assertPlaceholderEvent() throws ReflectiveOperationException { @Test void assertRowsEventFiltered() throws ReflectiveOperationException { List actual = (List) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", MySQLBaseBinlogEvent.class), - incrementalDumper, getFilteredWriteRowsEvent()); + incrementalDumper, new MySQLWriteRowsBinlogEvent("test", "t_order", Collections.singletonList(new Serializable[]{1}))); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); } - - private MySQLWriteRowsBinlogEvent getFilteredWriteRowsEvent() { - MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{1})); - result.setDatabaseName("test"); - result.setTableName("t_order"); - return result; - } }