Skip to content

Commit

Permalink
Refactor MySQLBaseRowsBinlogEvent (#32566)
Browse files Browse the repository at this point in the history
* Refactor MySQLBaseRowsBinlogEvent

* Refactor MySQLBaseRowsBinlogEvent
  • Loading branch information
terrymanu authored Aug 17, 2024
1 parent f0f1c2a commit addeb1d
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.Setter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;

/**
* MySQL rows base event.
*/
@RequiredArgsConstructor
@Getter
@Setter
public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent {

private String databaseName;
private final String databaseName;

private String tableName;
private final String tableName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.List;

/**
* MySQL delete rows binlog event.
*/
@RequiredArgsConstructor
@Getter
public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private final List<Serializable[]> beforeRows;

public MySQLDeleteRowsBinlogEvent(final String databaseName, final String tableName, final List<Serializable[]> beforeRows) {
super(databaseName, tableName);
this.beforeRows = beforeRows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.List;

/**
* MySQL update rows binlog event.
*/
@RequiredArgsConstructor
@Getter
public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private final List<Serializable[]> beforeRows;

private final List<Serializable[]> afterRows;

public MySQLUpdateRowsBinlogEvent(final String databaseName, final String tableName, final List<Serializable[]> beforeRows, final List<Serializable[]> afterRows) {
super(databaseName, tableName);
this.beforeRows = beforeRows;
this.afterRows = afterRows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.List;

/**
* MySQL write rows binlog event.
*/
@RequiredArgsConstructor
@Getter
public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private final List<Serializable[]> afterRows;

public MySQLWriteRowsBinlogEvent(final String databaseName, final String tableName, final List<Serializable[]> afterRows) {
super(databaseName, tableName);
this.afterRows = afterRows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,31 +199,32 @@ private void decodeTableMapEvent(final MySQLBinlogEventHeader binlogEventHeader,

private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload);
MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(packet.getRows());
initRowsEvent(result, binlogEventHeader, packet.getTableId());
MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId());
packet.readRows(tableMapEventPacket, payload);
MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows());
initRowsEvent(result, binlogEventHeader);
return result;
}

private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload);
MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(packet.getRows(), packet.getRows2());
initRowsEvent(result, binlogEventHeader, packet.getTableId());
MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId());
packet.readRows(tableMapEventPacket, payload);
MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2());
initRowsEvent(result, binlogEventHeader);
return result;
}

private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload);
MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(packet.getRows());
initRowsEvent(result, binlogEventHeader, packet.getTableId());
MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId());
packet.readRows(tableMapEventPacket, payload);
MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows());
initRowsEvent(result, binlogEventHeader);
return result;
}

private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final MySQLBinlogEventHeader binlogEventHeader, final long tableId) {
rowsEvent.setDatabaseName(binlogContext.getTableMapEvent(tableId).getSchemaName());
rowsEvent.setTableName(binlogContext.getTableMapEvent(tableId).getTableName());
private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final MySQLBinlogEventHeader binlogEventHeader) {
rowsEvent.setFileName(binlogContext.getFileName());
rowsEvent.setPosition(binlogEventHeader.getLogPos());
rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,62 +134,42 @@ private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {

@Test
void assertWriteRowsEvent() throws ReflectiveOperationException {
List<Record> actual = getRecordsByWriteRowsEvent(createWriteRowsEvent());
List<Record> actual = getRecordsByWriteRowsEvent(new MySQLWriteRowsBinlogEvent("", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.INSERT));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}

private MySQLWriteRowsBinlogEvent createWriteRowsEvent() {
MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
result.setDatabaseName("");
result.setTableName("t_order");
return result;
}

private List<Record> getRecordsByWriteRowsEvent(final MySQLWriteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", MySQLWriteRowsBinlogEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
}

@Test
void assertUpdateRowsEvent() throws ReflectiveOperationException {
List<Record> actual = getRecordsByUpdateRowsEvent(createUpdateRowsEvent());
List<Record> actual = getRecordsByUpdateRowsEvent(
new MySQLUpdateRowsBinlogEvent("test", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.UPDATE));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}

private MySQLUpdateRowsBinlogEvent createUpdateRowsEvent() {
MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, "OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"}));
result.setDatabaseName("test");
result.setTableName("t_order");
return result;
}

private List<Record> getRecordsByUpdateRowsEvent(final MySQLUpdateRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", MySQLUpdateRowsBinlogEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
}

@Test
void assertDeleteRowsEvent() throws ReflectiveOperationException {
List<Record> actual = getRecordsByDeleteRowsEvent(createDeleteRowsEvent());
List<Record> actual = getRecordsByDeleteRowsEvent(new MySQLDeleteRowsBinlogEvent("", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.DELETE));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}

private MySQLDeleteRowsBinlogEvent createDeleteRowsEvent() {
MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
result.setDatabaseName("");
result.setTableName("t_order");
return result;
}

private List<Record> getRecordsByDeleteRowsEvent(final MySQLDeleteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", MySQLDeleteRowsBinlogEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
Expand All @@ -205,15 +185,8 @@ void assertPlaceholderEvent() throws ReflectiveOperationException {
@Test
void assertRowsEventFiltered() throws ReflectiveOperationException {
List<Record> actual = (List<Record>) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", MySQLBaseBinlogEvent.class),
incrementalDumper, getFilteredWriteRowsEvent());
incrementalDumper, new MySQLWriteRowsBinlogEvent("test", "t_order", Collections.singletonList(new Serializable[]{1})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
}

private MySQLWriteRowsBinlogEvent getFilteredWriteRowsEvent() {
MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{1}));
result.setDatabaseName("test");
result.setTableName("t_order");
return result;
}
}

0 comments on commit addeb1d

Please sign in to comment.