Skip to content

Commit

Permalink
source-mysql: Refine PK-update behavior
Browse files Browse the repository at this point in the history
Now the insert half (of a CDC update which modifies the primary
key of a row) will be treated as operation type `'c'` instead of
`'u'` and will omit the `before` state. So basically it looks
just like a normal insert which happens to occur immediately
after a normal delete of the old row-state.
  • Loading branch information
willdonnelly committed Dec 6, 2024
1 parent cf83676 commit ee46b90
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
8 changes: 4 additions & 4 deletions source-mysql/.snapshots/TestPrimaryKeyUpdate
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"three","id":3}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"five","id":5}
{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123-D","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":1}
{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"one","id":1}},"data":"one","id":6}
{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123-D","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4}
{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"four","id":4}},"data":"four","id":7}
{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":1}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":6}
{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":7}
# ================================
# Final State Checkpoint
# ================================
Expand Down
1 change: 0 additions & 1 deletion source-mysql/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func (tb *testBackend) lowerTuningParameters(t testing.TB) {
func (tb *testBackend) CaptureSpec(ctx context.Context, t testing.TB, streamMatchers ...*regexp.Regexp) *st.CaptureSpec {
var sanitizers = make(map[string]*regexp.Regexp)
sanitizers[`"<TIMESTAMP>"`] = regexp.MustCompile(`"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|-[0-9]+:[0-9]+)"`)
sanitizers[`"cursor":"binlog.000123:56789:123-D"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+:[0-9]+-D"`)
sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+:[0-9]+"`)
sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+"`)
sanitizers[`"ts_ms":1111111111111`] = regexp.MustCompile(`"ts_ms":[0-9]+`)
Expand Down
44 changes: 31 additions & 13 deletions source-mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,29 +396,47 @@ func (rs *mysqlReplicationStream) run(ctx context.Context, startCursor mysql.Pos
eventTxID = rs.gtidString
}
if !bytes.Equal(rowKeyBefore, rowKeyAfter) {
// Emit a synthetic delete event of the old row-state
// When the row key is changed by an update, translate it into a synthetic pair: a delete
// event of the old row-state, plus an insert event of the new row-state.
events = append(events, &sqlcapture.ChangeEvent{
Operation: sqlcapture.DeleteOp,
RowKey: rowKeyBefore,
Before: before,
Source: &mysqlSourceInfo{
SourceCommon: sourceCommon,
EventCursor: fmt.Sprintf("%s:%d:%d-D", cursor.Name, binlogEstimatedOffset, rowIdx/2),
// Since updates consist of paired row-states (before, then after) which we iterate
// over two-at-a-time, it is consistent to have the row-index portion of the event
// cursor be the before-state index for this deletion and the after-state index for
// the insert.
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx-1),
TxID: eventTxID,
},
}, &sqlcapture.ChangeEvent{
Operation: sqlcapture.InsertOp,
RowKey: rowKeyAfter,
After: after,
Source: &mysqlSourceInfo{
SourceCommon: sourceCommon,
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx),
TxID: eventTxID,
},
})
} else {
events = append(events, &sqlcapture.ChangeEvent{
Operation: sqlcapture.UpdateOp,
RowKey: rowKeyAfter,
Before: before,
After: after,
Source: &mysqlSourceInfo{
SourceCommon: sourceCommon,
// For updates the row-index part of the event cursor has to increment by two here
// so that there's room for synthetic delete/insert pairs. Since this value really
// just needs to be unique and properly ordered this is fine.
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx),
TxID: eventTxID,
},
})
}
events = append(events, &sqlcapture.ChangeEvent{
Operation: sqlcapture.UpdateOp,
RowKey: rowKeyAfter,
Before: before,
After: after,
Source: &mysqlSourceInfo{
SourceCommon: sourceCommon,
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx/2),
TxID: eventTxID,
},
})
for _, event := range events {
if err := rs.emitEvent(ctx, event); err != nil {
return err
Expand Down

0 comments on commit ee46b90

Please sign in to comment.