From f6b244458d3ff2777b8ae64e0a4aae992e2cdd0d Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Thu, 5 Dec 2024 14:37:58 -0600 Subject: [PATCH] source-mysql: Implicit delete on PK updates This is fairly straightforward: if the primary key is changed by a CDC update event, then instead of an update document we should capture a synthetic deletion of the old row-state and an insert of the new row-state. Interestingly, since we base the implicit-delete decision on the serialized row key, which is based on the configured key columns of a binding, which typically come from the collection spec, this should work correctly even if the output collection key differs from the source DB primary key. But I haven't tested that, I was just thinking of the obvious weird cases and I think that's accurate. --- source-mysql/.snapshots/TestPrimaryKeyUpdate | 8 ++- source-mysql/main_test.go | 4 +- source-mysql/replication.go | 71 +++++++++++++++----- 3 files changed, 63 insertions(+), 20 deletions(-) diff --git a/source-mysql/.snapshots/TestPrimaryKeyUpdate b/source-mysql/.snapshots/TestPrimaryKeyUpdate index 07b557a8af..353d857565 100644 --- a/source-mysql/.snapshots/TestPrimaryKeyUpdate +++ b/source-mysql/.snapshots/TestPrimaryKeyUpdate @@ -1,5 +1,5 @@ # ================================ -# Collection "acmeCo/test/test_primarykeyupdate_63510878": 8 Documents +# Collection "acmeCo/test/test_primarykeyupdate_63510878": 10 Documents # ================================ {"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:0"}},"data":"zero","id":0} {"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:1"}},"data":"one","id":1} @@ -7,8 +7,10 @@ {"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"three","id":3} {"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4} {"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"five","id":5} -{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"one","id":1}},"data":"one","id":6} -{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"four","id":4}},"data":"four","id":7} +{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":1} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":6} +{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":7} # ================================ # Final State Checkpoint # ================================ diff --git a/source-mysql/main_test.go b/source-mysql/main_test.go index 8bcfb4157a..c0af27db82 100644 --- a/source-mysql/main_test.go +++ b/source-mysql/main_test.go @@ -115,8 +115,8 @@ func (tb *testBackend) lowerTuningParameters(t testing.TB) { func (tb *testBackend) CaptureSpec(ctx context.Context, t testing.TB, streamMatchers ...*regexp.Regexp) *st.CaptureSpec { var sanitizers = make(map[string]*regexp.Regexp) sanitizers[`""`] = regexp.MustCompile(`"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|-[0-9]+:[0-9]+)"`) - sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":".+\.[0-9]+:[0-9]+:[0-9]+"`) - sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":".+\.[0-9]+:[0-9]+"`) + sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+:[0-9]+"`) + sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+"`) sanitizers[`"ts_ms":1111111111111`] = regexp.MustCompile(`"ts_ms":[0-9]+`) sanitizers[`"txid":"11111111-1111-1111-1111-111111111111:111"`] = regexp.MustCompile(`"txid":"[0-9a-f-]+:[0-9]+"`) diff --git a/source-mysql/replication.go b/source-mysql/replication.go index 824608d7cb..244cc0259f 100644 --- a/source-mysql/replication.go +++ b/source-mysql/replication.go @@ -375,9 +375,13 @@ func (rs *mysqlReplicationStream) run(ctx context.Context, startCursor mysql.Pos return fmt.Errorf("error decoding row values: %w", err) } after = mergePreimage(after, before) - rowKey, err := sqlcapture.EncodeRowKey(keyColumns, after, columnTypes, encodeKeyFDB) + rowKeyBefore, err := sqlcapture.EncodeRowKey(keyColumns, before, columnTypes, encodeKeyFDB) if err != nil { - return fmt.Errorf("error encoding row key for %q: %w", streamID, err) + return fmt.Errorf("error encoding 'before' row key for %q: %w", streamID, err) + } + rowKeyAfter, err := sqlcapture.EncodeRowKey(keyColumns, after, columnTypes, encodeKeyFDB) + if err != nil { + return fmt.Errorf("error encoding 'after' row key for %q: %w", streamID, err) } if err := rs.db.translateRecordFields(false, columnTypes, before); err != nil { return fmt.Errorf("error translating 'before' of %q UpdateOp: %w", streamID, err) @@ -385,21 +389,58 @@ func (rs *mysqlReplicationStream) run(ctx context.Context, startCursor mysql.Pos if err := rs.db.translateRecordFields(false, columnTypes, after); err != nil { return fmt.Errorf("error translating 'after' of %q UpdateOp: %w", streamID, err) } - var sourceInfo = &mysqlSourceInfo{ - SourceCommon: sourceCommon, - EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx/2), - } + + var events []sqlcapture.DatabaseEvent + var eventTxID string if rs.db.includeTxIDs[streamID] { - sourceInfo.TxID = rs.gtidString + eventTxID = rs.gtidString + } + if !bytes.Equal(rowKeyBefore, rowKeyAfter) { + // When the row key is changed by an update, translate it into a synthetic pair: a delete + // event of the old row-state, plus an insert event of the new row-state. + events = append(events, &sqlcapture.ChangeEvent{ + Operation: sqlcapture.DeleteOp, + RowKey: rowKeyBefore, + Before: before, + Source: &mysqlSourceInfo{ + SourceCommon: sourceCommon, + // Since updates consist of paired row-states (before, then after) which we iterate + // over two-at-a-time, it is consistent to have the row-index portion of the event + // cursor be the before-state index for this deletion and the after-state index for + // the insert. + EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx-1), + TxID: eventTxID, + }, + }, &sqlcapture.ChangeEvent{ + Operation: sqlcapture.InsertOp, + RowKey: rowKeyAfter, + After: after, + Source: &mysqlSourceInfo{ + SourceCommon: sourceCommon, + EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx), + TxID: eventTxID, + }, + }) + } else { + events = append(events, &sqlcapture.ChangeEvent{ + Operation: sqlcapture.UpdateOp, + RowKey: rowKeyAfter, + Before: before, + After: after, + Source: &mysqlSourceInfo{ + SourceCommon: sourceCommon, + // For updates the row-index part of the event cursor has to increment by two here + // so that there's room for synthetic delete/insert pairs. Since this value really + // just needs to be unique and properly ordered this is fine. + EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx), + TxID: eventTxID, + }, + }) } - if err := rs.emitEvent(ctx, &sqlcapture.ChangeEvent{ - Operation: sqlcapture.UpdateOp, - RowKey: rowKey, - Before: before, - After: after, - Source: sourceInfo, - }); err != nil { - return err + for _, event := range events { + if err := rs.emitEvent(ctx, event); err != nil { + return err + } } rs.uncommittedChanges++ // Keep a count of all uncommitted changes if nonTransactionalTable {