Skip to content

Commit

Permalink
source-mysql: Implicit delete on PK updates
Browse files Browse the repository at this point in the history
This is fairly straightforward: if the primary key is changed by
a CDC update event, then instead of an update document we should
capture a synthetic deletion of the old row-state and an insert
of the new row-state.

Interestingly, since we base the implicit-delete decision on
the serialized row key, which is based on the configured key
columns of a binding, which typically come from the collection
spec, this should work correctly even if the output collection
key differs from the source DB primary key. But I haven't tested
that, I was just thinking of the obvious weird cases and I think
that's accurate.
  • Loading branch information
willdonnelly committed Dec 10, 2024
1 parent 47abc03 commit f6b2444
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 20 deletions.
8 changes: 5 additions & 3 deletions source-mysql/.snapshots/TestPrimaryKeyUpdate
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# ================================
# Collection "acmeCo/test/test_primarykeyupdate_63510878": 8 Documents
# Collection "acmeCo/test/test_primarykeyupdate_63510878": 10 Documents
# ================================
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:0"}},"data":"zero","id":0}
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:1"}},"data":"one","id":1}
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:2"}},"data":"two","id":2}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"three","id":3}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"five","id":5}
{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"one","id":1}},"data":"one","id":6}
{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"four","id":4}},"data":"four","id":7}
{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":1}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":6}
{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":7}
# ================================
# Final State Checkpoint
# ================================
Expand Down
4 changes: 2 additions & 2 deletions source-mysql/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func (tb *testBackend) lowerTuningParameters(t testing.TB) {
func (tb *testBackend) CaptureSpec(ctx context.Context, t testing.TB, streamMatchers ...*regexp.Regexp) *st.CaptureSpec {
var sanitizers = make(map[string]*regexp.Regexp)
sanitizers[`"<TIMESTAMP>"`] = regexp.MustCompile(`"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|-[0-9]+:[0-9]+)"`)
sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":".+\.[0-9]+:[0-9]+:[0-9]+"`)
sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":".+\.[0-9]+:[0-9]+"`)
sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+:[0-9]+"`)
sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+"`)
sanitizers[`"ts_ms":1111111111111`] = regexp.MustCompile(`"ts_ms":[0-9]+`)
sanitizers[`"txid":"11111111-1111-1111-1111-111111111111:111"`] = regexp.MustCompile(`"txid":"[0-9a-f-]+:[0-9]+"`)

Expand Down
71 changes: 56 additions & 15 deletions source-mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,31 +375,72 @@ func (rs *mysqlReplicationStream) run(ctx context.Context, startCursor mysql.Pos
return fmt.Errorf("error decoding row values: %w", err)
}
after = mergePreimage(after, before)
rowKey, err := sqlcapture.EncodeRowKey(keyColumns, after, columnTypes, encodeKeyFDB)
rowKeyBefore, err := sqlcapture.EncodeRowKey(keyColumns, before, columnTypes, encodeKeyFDB)
if err != nil {
return fmt.Errorf("error encoding row key for %q: %w", streamID, err)
return fmt.Errorf("error encoding 'before' row key for %q: %w", streamID, err)
}
rowKeyAfter, err := sqlcapture.EncodeRowKey(keyColumns, after, columnTypes, encodeKeyFDB)
if err != nil {
return fmt.Errorf("error encoding 'after' row key for %q: %w", streamID, err)
}
if err := rs.db.translateRecordFields(false, columnTypes, before); err != nil {
return fmt.Errorf("error translating 'before' of %q UpdateOp: %w", streamID, err)
}
if err := rs.db.translateRecordFields(false, columnTypes, after); err != nil {
return fmt.Errorf("error translating 'after' of %q UpdateOp: %w", streamID, err)
}
var sourceInfo = &mysqlSourceInfo{
SourceCommon: sourceCommon,
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx/2),
}

var events []sqlcapture.DatabaseEvent
var eventTxID string
if rs.db.includeTxIDs[streamID] {
sourceInfo.TxID = rs.gtidString
eventTxID = rs.gtidString
}
if !bytes.Equal(rowKeyBefore, rowKeyAfter) {
// When the row key is changed by an update, translate it into a synthetic pair: a delete
// event of the old row-state, plus an insert event of the new row-state.
events = append(events, &sqlcapture.ChangeEvent{
Operation: sqlcapture.DeleteOp,
RowKey: rowKeyBefore,
Before: before,
Source: &mysqlSourceInfo{
SourceCommon: sourceCommon,
// Since updates consist of paired row-states (before, then after) which we iterate
// over two-at-a-time, it is consistent to have the row-index portion of the event
// cursor be the before-state index for this deletion and the after-state index for
// the insert.
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx-1),
TxID: eventTxID,
},
}, &sqlcapture.ChangeEvent{
Operation: sqlcapture.InsertOp,
RowKey: rowKeyAfter,
After: after,
Source: &mysqlSourceInfo{
SourceCommon: sourceCommon,
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx),
TxID: eventTxID,
},
})
} else {
events = append(events, &sqlcapture.ChangeEvent{
Operation: sqlcapture.UpdateOp,
RowKey: rowKeyAfter,
Before: before,
After: after,
Source: &mysqlSourceInfo{
SourceCommon: sourceCommon,
// For updates the row-index part of the event cursor has to increment by two here
// so that there's room for synthetic delete/insert pairs. Since this value really
// just needs to be unique and properly ordered this is fine.
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx),
TxID: eventTxID,
},
})
}
if err := rs.emitEvent(ctx, &sqlcapture.ChangeEvent{
Operation: sqlcapture.UpdateOp,
RowKey: rowKey,
Before: before,
After: after,
Source: sourceInfo,
}); err != nil {
return err
for _, event := range events {
if err := rs.emitEvent(ctx, event); err != nil {
return err
}
}
rs.uncommittedChanges++ // Keep a count of all uncommitted changes
if nonTransactionalTable {
Expand Down

0 comments on commit f6b2444

Please sign in to comment.