Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-mysql: Treat CDC updates which change the primary key as implicit deletions #2185

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions source-mysql/.snapshots/TestPrimaryKeyUpdate
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# ================================
# Collection "acmeCo/test/test_primarykeyupdate_63510878": 10 Documents
# ================================
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:0"}},"data":"zero","id":0}
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:1"}},"data":"one","id":1}
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:2"}},"data":"two","id":2}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"three","id":3}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"five","id":5}
{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":1}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":6}
{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":7}
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"test%2FPrimaryKeyUpdate_63510878":{"backfilled":3,"key_columns":["id"],"metadata":{"charset":"utf8mb4","schema":{"columns":["id","data"],"types":{"data":{"charset":"utf8mb4","type":"text"},"id":{"type":"int"}}}},"mode":"Active"}},"cursor":"binlog.000123:56789"}

27 changes: 27 additions & 0 deletions source-mysql/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,3 +509,30 @@ func TestDroppedAndRecreatedTable(t *testing.T) {

cupaloy.SnapshotT(t, cs.Summary())
}

func TestPrimaryKeyUpdate(t *testing.T) {
var tb, ctx = mysqlTestBackend(t), context.Background()
var uniqueID = "63510878"
var tableDef = "(id INTEGER PRIMARY KEY, data TEXT)"
var tableName = tb.CreateTable(ctx, t, uniqueID, tableDef)

var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(uniqueID))
cs.Validator = &st.OrderedCaptureValidator{}
sqlcapture.TestShutdownAfterCaughtUp = true
t.Cleanup(func() { sqlcapture.TestShutdownAfterCaughtUp = false })

// Initial backfill
tb.Insert(ctx, t, tableName, [][]any{{0, "zero"}, {1, "one"}, {2, "two"}})
cs.Capture(ctx, t, nil)

// Some replication
tb.Insert(ctx, t, tableName, [][]any{{3, "three"}, {4, "four"}, {5, "five"}})
cs.Capture(ctx, t, nil)

// Primary key updates
tb.Update(ctx, t, tableName, "id", 1, "id", 6)
tb.Update(ctx, t, tableName, "id", 4, "id", 7)
cs.Capture(ctx, t, nil)

cupaloy.SnapshotT(t, cs.Summary())
}
4 changes: 2 additions & 2 deletions source-mysql/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func (tb *testBackend) lowerTuningParameters(t testing.TB) {
func (tb *testBackend) CaptureSpec(ctx context.Context, t testing.TB, streamMatchers ...*regexp.Regexp) *st.CaptureSpec {
var sanitizers = make(map[string]*regexp.Regexp)
sanitizers[`"<TIMESTAMP>"`] = regexp.MustCompile(`"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|-[0-9]+:[0-9]+)"`)
sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":".+\.[0-9]+:[0-9]+:[0-9]+"`)
sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":".+\.[0-9]+:[0-9]+"`)
sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+:[0-9]+"`)
sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+"`)
sanitizers[`"ts_ms":1111111111111`] = regexp.MustCompile(`"ts_ms":[0-9]+`)
sanitizers[`"txid":"11111111-1111-1111-1111-111111111111:111"`] = regexp.MustCompile(`"txid":"[0-9a-f-]+:[0-9]+"`)

Expand Down
71 changes: 56 additions & 15 deletions source-mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,31 +375,72 @@ func (rs *mysqlReplicationStream) run(ctx context.Context, startCursor mysql.Pos
return fmt.Errorf("error decoding row values: %w", err)
}
after = mergePreimage(after, before)
rowKey, err := sqlcapture.EncodeRowKey(keyColumns, after, columnTypes, encodeKeyFDB)
rowKeyBefore, err := sqlcapture.EncodeRowKey(keyColumns, before, columnTypes, encodeKeyFDB)
if err != nil {
return fmt.Errorf("error encoding row key for %q: %w", streamID, err)
return fmt.Errorf("error encoding 'before' row key for %q: %w", streamID, err)
}
rowKeyAfter, err := sqlcapture.EncodeRowKey(keyColumns, after, columnTypes, encodeKeyFDB)
if err != nil {
return fmt.Errorf("error encoding 'after' row key for %q: %w", streamID, err)
}
if err := rs.db.translateRecordFields(false, columnTypes, before); err != nil {
return fmt.Errorf("error translating 'before' of %q UpdateOp: %w", streamID, err)
}
if err := rs.db.translateRecordFields(false, columnTypes, after); err != nil {
return fmt.Errorf("error translating 'after' of %q UpdateOp: %w", streamID, err)
}
var sourceInfo = &mysqlSourceInfo{
SourceCommon: sourceCommon,
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx/2),
}

var events []sqlcapture.DatabaseEvent
var eventTxID string
if rs.db.includeTxIDs[streamID] {
sourceInfo.TxID = rs.gtidString
eventTxID = rs.gtidString
}
if !bytes.Equal(rowKeyBefore, rowKeyAfter) {
// When the row key is changed by an update, translate it into a synthetic pair: a delete
// event of the old row-state, plus an insert event of the new row-state.
events = append(events, &sqlcapture.ChangeEvent{
Operation: sqlcapture.DeleteOp,
RowKey: rowKeyBefore,
Before: before,
Source: &mysqlSourceInfo{
SourceCommon: sourceCommon,
// Since updates consist of paired row-states (before, then after) which we iterate
// over two-at-a-time, it is consistent to have the row-index portion of the event
// cursor be the before-state index for this deletion and the after-state index for
// the insert.
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx-1),
TxID: eventTxID,
},
}, &sqlcapture.ChangeEvent{
Operation: sqlcapture.InsertOp,
RowKey: rowKeyAfter,
After: after,
Source: &mysqlSourceInfo{
SourceCommon: sourceCommon,
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx),
TxID: eventTxID,
},
})
} else {
events = append(events, &sqlcapture.ChangeEvent{
Operation: sqlcapture.UpdateOp,
RowKey: rowKeyAfter,
Before: before,
After: after,
Source: &mysqlSourceInfo{
SourceCommon: sourceCommon,
// For updates the row-index part of the event cursor has to increment by two here
// so that there's room for synthetic delete/insert pairs. Since this value really
// just needs to be unique and properly ordered this is fine.
EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx),
TxID: eventTxID,
},
})
}
if err := rs.emitEvent(ctx, &sqlcapture.ChangeEvent{
Operation: sqlcapture.UpdateOp,
RowKey: rowKey,
Before: before,
After: after,
Source: sourceInfo,
}); err != nil {
return err
for _, event := range events {
if err := rs.emitEvent(ctx, event); err != nil {
return err
}
}
rs.uncommittedChanges++ // Keep a count of all uncommitted changes
if nonTransactionalTable {
Expand Down
16 changes: 16 additions & 0 deletions source-postgres/.snapshots/TestPrimaryKeyUpdate
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# ================================
# Collection "acmeCo/test/test_primarykeyupdate_63510878": 8 Documents
# ================================
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111]}},"data":"zero","id":0}
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111]}},"data":"one","id":1}
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111]}},"data":"two","id":2}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111],"txid":111111}},"data":"three","id":3}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111],"txid":111111}},"data":"four","id":4}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111],"txid":111111}},"data":"five","id":5}
{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111],"txid":111111},"before":{"id":1}},"data":"one","id":6}
{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111],"txid":111111},"before":{"id":4}},"data":"four","id":7}
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"test%2Fprimarykeyupdate_63510878":{"backfilled":3,"key_columns":["id"],"mode":"Active"}},"cursor":"0/1111111"}

27 changes: 27 additions & 0 deletions source-postgres/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,3 +614,30 @@ func TestCIText(t *testing.T) {
cupaloy.SnapshotT(t, cs.Summary())
})
}

func TestPrimaryKeyUpdate(t *testing.T) {
var tb, ctx = postgresTestBackend(t), context.Background()
var uniqueID = "63510878"
var tableDef = "(id INTEGER PRIMARY KEY, data TEXT)"
var tableName = tb.CreateTable(ctx, t, uniqueID, tableDef)

var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(uniqueID))
cs.Validator = &st.OrderedCaptureValidator{}
sqlcapture.TestShutdownAfterCaughtUp = true
t.Cleanup(func() { sqlcapture.TestShutdownAfterCaughtUp = false })

// Initial backfill
tb.Insert(ctx, t, tableName, [][]any{{0, "zero"}, {1, "one"}, {2, "two"}})
cs.Capture(ctx, t, nil)

// Some replication
tb.Insert(ctx, t, tableName, [][]any{{3, "three"}, {4, "four"}, {5, "five"}})
cs.Capture(ctx, t, nil)

// Primary key updates
tb.Update(ctx, t, tableName, "id", 1, "id", 6)
tb.Update(ctx, t, tableName, "id", 4, "id", 7)
cs.Capture(ctx, t, nil)

cupaloy.SnapshotT(t, cs.Summary())
}
18 changes: 18 additions & 0 deletions source-sqlserver/.snapshots/TestPrimaryKeyUpdate
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# ================================
# Collection "acmeCo/test/test_primarykeyupdate_63510878": 10 Documents
# ================================
{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_PrimaryKeyUpdate_63510878","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"zero","id":0}
{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_PrimaryKeyUpdate_63510878","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"one","id":1}
{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_PrimaryKeyUpdate_63510878","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"two","id":2}
{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"three","id":3}
{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"four","id":4}
{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"five","id":5}
{"_meta":{"op":"d","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"id":1}
{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"one","id":6}
{"_meta":{"op":"d","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"id":4}
{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"four","id":7}
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"dbo%2Ftest_PrimaryKeyUpdate_63510878":{"backfilled":3,"key_columns":["id"],"mode":"Active"}},"cursor":"AAAAAAAAAAAAAA=="}

28 changes: 28 additions & 0 deletions source-sqlserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,5 +564,33 @@ func TestFilegroupAndRole(t *testing.T) {
cs.Capture(ctx, t, nil)
tb.Insert(ctx, t, tableName, [][]any{{3, "three"}, {4, "four"}, {5, "five"}})
cs.Capture(ctx, t, nil)

cupaloy.SnapshotT(t, cs.Summary())
}

func TestPrimaryKeyUpdate(t *testing.T) {
var tb, ctx = sqlserverTestBackend(t), context.Background()
var uniqueID = "63510878"
var tableDef = "(id INTEGER PRIMARY KEY, data TEXT)"
var tableName = tb.CreateTable(ctx, t, uniqueID, tableDef)

var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(uniqueID))
cs.Validator = &st.OrderedCaptureValidator{}
sqlcapture.TestShutdownAfterCaughtUp = true
t.Cleanup(func() { sqlcapture.TestShutdownAfterCaughtUp = false })

// Initial backfill
tb.Insert(ctx, t, tableName, [][]any{{0, "zero"}, {1, "one"}, {2, "two"}})
cs.Capture(ctx, t, nil)

// Some replication
tb.Insert(ctx, t, tableName, [][]any{{3, "three"}, {4, "four"}, {5, "five"}})
cs.Capture(ctx, t, nil)

// Primary key updates
tb.Update(ctx, t, tableName, "id", 1, "id", 6)
tb.Update(ctx, t, tableName, "id", 4, "id", 7)
cs.Capture(ctx, t, nil)

cupaloy.SnapshotT(t, cs.Summary())
}
Loading