diff --git a/source-mysql/.snapshots/TestPrimaryKeyUpdate b/source-mysql/.snapshots/TestPrimaryKeyUpdate index 07b557a8a..353d85756 100644 --- a/source-mysql/.snapshots/TestPrimaryKeyUpdate +++ b/source-mysql/.snapshots/TestPrimaryKeyUpdate @@ -1,5 +1,5 @@ # ================================ -# Collection "acmeCo/test/test_primarykeyupdate_63510878": 8 Documents +# Collection "acmeCo/test/test_primarykeyupdate_63510878": 10 Documents # ================================ {"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:0"}},"data":"zero","id":0} {"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:1"}},"data":"one","id":1} @@ -7,8 +7,10 @@ {"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"three","id":3} {"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4} {"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"five","id":5} -{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"one","id":1}},"data":"one","id":6} -{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"four","id":4}},"data":"four","id":7} +{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":1} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":6} +{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":7} # ================================ # Final State Checkpoint # ================================ diff --git a/source-mysql/main_test.go b/source-mysql/main_test.go index 8bcfb4157..c0af27db8 100644 --- a/source-mysql/main_test.go +++ b/source-mysql/main_test.go @@ -115,8 +115,8 @@ func (tb *testBackend) lowerTuningParameters(t testing.TB) { func (tb *testBackend) CaptureSpec(ctx context.Context, t testing.TB, streamMatchers ...*regexp.Regexp) *st.CaptureSpec { var sanitizers = make(map[string]*regexp.Regexp) sanitizers[`""`] = regexp.MustCompile(`"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|-[0-9]+:[0-9]+)"`) - sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":".+\.[0-9]+:[0-9]+:[0-9]+"`) - sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":".+\.[0-9]+:[0-9]+"`) + sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+:[0-9]+"`) + sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+"`) sanitizers[`"ts_ms":1111111111111`] = regexp.MustCompile(`"ts_ms":[0-9]+`) sanitizers[`"txid":"11111111-1111-1111-1111-111111111111:111"`] = regexp.MustCompile(`"txid":"[0-9a-f-]+:[0-9]+"`) diff --git a/source-mysql/replication.go b/source-mysql/replication.go index 824608d7c..244cc0259 100644 --- a/source-mysql/replication.go +++ b/source-mysql/replication.go @@ -375,9 +375,13 @@ func (rs *mysqlReplicationStream) run(ctx context.Context, startCursor mysql.Pos return fmt.Errorf("error decoding row values: %w", err) } after = mergePreimage(after, before) - rowKey, err := sqlcapture.EncodeRowKey(keyColumns, after, columnTypes, encodeKeyFDB) + rowKeyBefore, err := sqlcapture.EncodeRowKey(keyColumns, before, columnTypes, encodeKeyFDB) if err != nil { - return fmt.Errorf("error encoding row key for %q: %w", streamID, err) + return fmt.Errorf("error encoding 'before' row key for %q: %w", streamID, err) + } + rowKeyAfter, err := sqlcapture.EncodeRowKey(keyColumns, after, columnTypes, encodeKeyFDB) + if err != nil { + return fmt.Errorf("error encoding 'after' row key for %q: %w", streamID, err) } if err := rs.db.translateRecordFields(false, columnTypes, before); err != nil { return fmt.Errorf("error translating 'before' of %q UpdateOp: %w", streamID, err) @@ -385,21 +389,58 @@ func (rs *mysqlReplicationStream) run(ctx context.Context, startCursor mysql.Pos if err := rs.db.translateRecordFields(false, columnTypes, after); err != nil { return fmt.Errorf("error translating 'after' of %q UpdateOp: %w", streamID, err) } - var sourceInfo = &mysqlSourceInfo{ - SourceCommon: sourceCommon, - EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx/2), - } + + var events []sqlcapture.DatabaseEvent + var eventTxID string if rs.db.includeTxIDs[streamID] { - sourceInfo.TxID = rs.gtidString + eventTxID = rs.gtidString + } + if !bytes.Equal(rowKeyBefore, rowKeyAfter) { + // When the row key is changed by an update, translate it into a synthetic pair: a delete + // event of the old row-state, plus an insert event of the new row-state. + events = append(events, &sqlcapture.ChangeEvent{ + Operation: sqlcapture.DeleteOp, + RowKey: rowKeyBefore, + Before: before, + Source: &mysqlSourceInfo{ + SourceCommon: sourceCommon, + // Since updates consist of paired row-states (before, then after) which we iterate + // over two-at-a-time, it is consistent to have the row-index portion of the event + // cursor be the before-state index for this deletion and the after-state index for + // the insert. + EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx-1), + TxID: eventTxID, + }, + }, &sqlcapture.ChangeEvent{ + Operation: sqlcapture.InsertOp, + RowKey: rowKeyAfter, + After: after, + Source: &mysqlSourceInfo{ + SourceCommon: sourceCommon, + EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx), + TxID: eventTxID, + }, + }) + } else { + events = append(events, &sqlcapture.ChangeEvent{ + Operation: sqlcapture.UpdateOp, + RowKey: rowKeyAfter, + Before: before, + After: after, + Source: &mysqlSourceInfo{ + SourceCommon: sourceCommon, + // For updates the row-index part of the event cursor has to increment by two here + // so that there's room for synthetic delete/insert pairs. Since this value really + // just needs to be unique and properly ordered this is fine. + EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx), + TxID: eventTxID, + }, + }) } - if err := rs.emitEvent(ctx, &sqlcapture.ChangeEvent{ - Operation: sqlcapture.UpdateOp, - RowKey: rowKey, - Before: before, - After: after, - Source: sourceInfo, - }); err != nil { - return err + for _, event := range events { + if err := rs.emitEvent(ctx, event); err != nil { + return err + } } rs.uncommittedChanges++ // Keep a count of all uncommitted changes if nonTransactionalTable {