Skip to content

Commit

Permalink
Refactor MySQLBaseBinlogEvent (#32567)
Browse files Browse the repository at this point in the history
* Refactor MySQLBaseBinlogEvent

* Refactor MySQLBaseBinlogEvent
  • Loading branch information
terrymanu authored Aug 17, 2024
1 parent addeb1d commit cf9e6a9
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;

import lombok.Getter;
import lombok.Setter;
import lombok.RequiredArgsConstructor;

/**
* MySQL base binlog event.
*/
@RequiredArgsConstructor
@Getter
@Setter
public abstract class MySQLBaseBinlogEvent {

private String fileName;
private final String fileName;

private long position;
private final long position;

private long timestamp;
private final long timestamp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,8 @@
* Placeholder binlog event, unsupported binlog event will replace it into this class.
*/
public final class PlaceholderBinlogEvent extends MySQLBaseBinlogEvent {

public PlaceholderBinlogEvent(final String fileName, final long position, final long timestamp) {
super(fileName, position, timestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;

/**
Expand All @@ -29,7 +28,6 @@
*
* @see <a href="https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_query">QUERY_EVENT</a>
*/
@RequiredArgsConstructor
@Getter
public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent {

Expand All @@ -42,4 +40,14 @@ public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent {
private final String databaseName;

private final String sql;

public MySQLQueryBinlogEvent(final String fileName, final long position, final long timestamp,
final long threadId, final long executionTime, final int errorCode, final String databaseName, final String sql) {
super(fileName, position, timestamp);
this.threadId = threadId;
this.executionTime = executionTime;
this.errorCode = errorCode;
this.databaseName = databaseName;
this.sql = sql;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;

/**
* MySQL rows base event.
*/
@RequiredArgsConstructor
@Getter
public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent {

private final String databaseName;

private final String tableName;

public MySQLBaseRowsBinlogEvent(final String fileName, final long position, final long timestamp, final String databaseName, final String tableName) {
super(fileName, position, timestamp);
this.databaseName = databaseName;
this.tableName = tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private final List<Serializable[]> beforeRows;

public MySQLDeleteRowsBinlogEvent(final String databaseName, final String tableName, final List<Serializable[]> beforeRows) {
super(databaseName, tableName);
public MySQLDeleteRowsBinlogEvent(final String fileName, final long position, final long timestamp, final String databaseName, final String tableName, final List<Serializable[]> beforeRows) {
super(fileName, position, timestamp, databaseName, tableName);
this.beforeRows = beforeRows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private final List<Serializable[]> afterRows;

public MySQLUpdateRowsBinlogEvent(final String databaseName, final String tableName, final List<Serializable[]> beforeRows, final List<Serializable[]> afterRows) {
super(databaseName, tableName);
public MySQLUpdateRowsBinlogEvent(final String fileName, final long position, final long timestamp,
final String databaseName, final String tableName, final List<Serializable[]> beforeRows, final List<Serializable[]> afterRows) {
super(fileName, position, timestamp, databaseName, tableName);
this.beforeRows = beforeRows;
this.afterRows = afterRows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private final List<Serializable[]> afterRows;

public MySQLWriteRowsBinlogEvent(final String databaseName, final String tableName, final List<Serializable[]> afterRows) {
super(databaseName, tableName);
public MySQLWriteRowsBinlogEvent(final String fileName, final long position, final long timestamp, final String databaseName, final String tableName, final List<Serializable[]> afterRows) {
super(fileName, position, timestamp, databaseName, tableName);
this.afterRows = afterRows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;

/**
* XID event is generated for a COMMIT of a transaction that modifies one or more tables of an XA-capable storage engine.
*
* @see <a href="https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_event_xid">XID_EVENT</a>
*/
@RequiredArgsConstructor
@Getter
public final class MySQLXidBinlogEvent extends MySQLBaseBinlogEvent {

private final long xid;

public MySQLXidBinlogEvent(final String fileName, final long position, final long timestamp, final long xid) {
super(fileName, position, timestamp);
this.xid = xid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,11 @@ private void dumpBinlog(final String binlogFileName, final long binlogPosition,
channel.pipeline().remove(MySQLCommandResponseHandler.class);
String tableKey = String.join(":", connectInfo.getHost(), String.valueOf(connectInfo.getPort()));
channel.pipeline().addLast(new MySQLBinlogEventPacketDecoder(checksumLength, GlobalTableMapEventMapping.getTableMapEventMap(tableKey), decodeWithTX));
channel.pipeline().addLast(new MySQLBinlogEventHandler(getLastBinlogEvent(binlogFileName, binlogPosition)));
channel.pipeline().addLast(new MySQLBinlogEventHandler(new PlaceholderBinlogEvent(binlogFileName, binlogPosition, 0L)));
resetSequenceID();
channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName));
}

private MySQLBaseBinlogEvent getLastBinlogEvent(final String binlogFileName, final long binlogPosition) {
PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
result.setFileName(binlogFileName);
result.setPosition(binlogPosition);
return result;
}

private void resetSequenceID() {
channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).get().set(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query.MySQLQueryBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction.MySQLXidBinlogEvent;
Expand Down Expand Up @@ -201,37 +200,28 @@ private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final MySQLBinlogEventH
MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId());
packet.readRows(tableMapEventPacket, payload);
MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows());
initRowsEvent(result, binlogEventHeader);
return result;
return new MySQLWriteRowsBinlogEvent(binlogContext.getFileName(),
binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows());
}

private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId());
packet.readRows(tableMapEventPacket, payload);
MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2());
initRowsEvent(result, binlogEventHeader);
return result;
return new MySQLUpdateRowsBinlogEvent(binlogContext.getFileName(),
binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2());
}

private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId());
packet.readRows(tableMapEventPacket, payload);
MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows());
initRowsEvent(result, binlogEventHeader);
return result;
}

private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final MySQLBinlogEventHeader binlogEventHeader) {
rowsEvent.setFileName(binlogContext.getFileName());
rowsEvent.setPosition(binlogEventHeader.getLogPos());
rowsEvent.setTimestamp(binlogEventHeader.getTimestamp());
return new MySQLDeleteRowsBinlogEvent(binlogContext.getFileName(),
binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows());
}

private PlaceholderBinlogEvent decodePlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
PlaceholderBinlogEvent result = createPlaceholderEvent(binlogEventHeader);
PlaceholderBinlogEvent result = new PlaceholderBinlogEvent(binlogContext.getFileName(), binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp());
int remainDataLength = binlogEventHeader.getEventSize() + 1 - binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex();
if (remainDataLength > 0) {
payload.skipReserved(remainDataLength);
Expand All @@ -247,28 +237,11 @@ private MySQLQueryBinlogEvent decodeQueryEvent(final MySQLBinlogEventHeader binl
payload.skipReserved(payload.readInt2());
String databaseName = payload.readStringNul();
String sql = payload.readStringFix(payload.getByteBuf().readableBytes() - binlogEventHeader.getChecksumLength());
MySQLQueryBinlogEvent result = new MySQLQueryBinlogEvent(threadId, executionTime, errorCode, databaseName, sql);
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getLogPos());
result.setTimestamp(binlogEventHeader.getTimestamp());
return result;
return new MySQLQueryBinlogEvent(binlogContext.getFileName(), binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), threadId, executionTime, errorCode, databaseName, sql);
}

private MySQLXidBinlogEvent decodeXidEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
MySQLXidBinlogEvent result = new MySQLXidBinlogEvent(payload.readInt8());
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getLogPos());
result.setTimestamp(binlogEventHeader.getTimestamp());
return result;
}

// TODO May be used again later, keep this method first.
private PlaceholderBinlogEvent createPlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader) {
PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getLogPos());
result.setTimestamp(binlogEventHeader.getTimestamp());
return result;
return new MySQLXidBinlogEvent(binlogContext.getFileName(), binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), payload.readInt8());
}

private void skipChecksum(final int eventType, final ByteBuf in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {

@Test
void assertWriteRowsEvent() throws ReflectiveOperationException {
List<Record> actual = getRecordsByWriteRowsEvent(new MySQLWriteRowsBinlogEvent("", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"})));
List<Record> actual = getRecordsByWriteRowsEvent(new MySQLWriteRowsBinlogEvent("", 0, 0L, "", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.INSERT));
Expand All @@ -148,8 +148,8 @@ private List<Record> getRecordsByWriteRowsEvent(final MySQLWriteRowsBinlogEvent

@Test
void assertUpdateRowsEvent() throws ReflectiveOperationException {
List<Record> actual = getRecordsByUpdateRowsEvent(
new MySQLUpdateRowsBinlogEvent("test", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"})));
List<Record> actual = getRecordsByUpdateRowsEvent(new MySQLUpdateRowsBinlogEvent(
"", 0, 0L, "test", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.UPDATE));
Expand All @@ -163,7 +163,7 @@ private List<Record> getRecordsByUpdateRowsEvent(final MySQLUpdateRowsBinlogEven

@Test
void assertDeleteRowsEvent() throws ReflectiveOperationException {
List<Record> actual = getRecordsByDeleteRowsEvent(new MySQLDeleteRowsBinlogEvent("", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"})));
List<Record> actual = getRecordsByDeleteRowsEvent(new MySQLDeleteRowsBinlogEvent("", 0, 0L, "", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.DELETE));
Expand All @@ -178,14 +178,14 @@ private List<Record> getRecordsByDeleteRowsEvent(final MySQLDeleteRowsBinlogEven
@Test
void assertPlaceholderEvent() throws ReflectiveOperationException {
List<Record> actual = (List<Record>) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", MySQLBaseBinlogEvent.class),
incrementalDumper, new PlaceholderBinlogEvent());
incrementalDumper, new PlaceholderBinlogEvent("", 0, 0L));
assertThat(actual.size(), is(1));
}

@Test
void assertRowsEventFiltered() throws ReflectiveOperationException {
List<Record> actual = (List<Record>) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", MySQLBaseBinlogEvent.class),
incrementalDumper, new MySQLWriteRowsBinlogEvent("test", "t_order", Collections.singletonList(new Serializable[]{1})));
incrementalDumper, new MySQLWriteRowsBinlogEvent("", 0, 0L, "test", "t_order", Collections.singletonList(new Serializable[]{1})));
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
}
Expand Down

0 comments on commit cf9e6a9

Please sign in to comment.