From efff5f832c6c0bf0a98cc9d22ed1a17c328be6d2 Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Thu, 5 Dec 2024 14:09:27 -0600 Subject: [PATCH 1/4] source-postgres: Add TestPrimaryKeyUpdate New test demonstrating current PK update behavior. --- .../.snapshots/TestPrimaryKeyUpdate | 16 +++++++++++ source-postgres/capture_test.go | 27 +++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 source-postgres/.snapshots/TestPrimaryKeyUpdate diff --git a/source-postgres/.snapshots/TestPrimaryKeyUpdate b/source-postgres/.snapshots/TestPrimaryKeyUpdate new file mode 100644 index 0000000000..c2001b6f82 --- /dev/null +++ b/source-postgres/.snapshots/TestPrimaryKeyUpdate @@ -0,0 +1,16 @@ +# ================================ +# Collection "acmeCo/test/test_primarykeyupdate_63510878": 8 Documents +# ================================ +{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111]}},"data":"zero","id":0} +{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111]}},"data":"one","id":1} +{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111]}},"data":"two","id":2} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111],"txid":111111}},"data":"three","id":3} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111],"txid":111111}},"data":"four","id":4} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111],"txid":111111}},"data":"five","id":5} +{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111],"txid":111111},"before":{"id":1}},"data":"one","id":6} +{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"primarykeyupdate_63510878","loc":[11111111,11111111,11111111],"txid":111111},"before":{"id":4}},"data":"four","id":7} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"test%2Fprimarykeyupdate_63510878":{"backfilled":3,"key_columns":["id"],"mode":"Active"}},"cursor":"0/1111111"} + diff --git a/source-postgres/capture_test.go b/source-postgres/capture_test.go index d800166dec..8959e66210 100644 --- a/source-postgres/capture_test.go +++ b/source-postgres/capture_test.go @@ -614,3 +614,30 @@ func TestCIText(t *testing.T) { cupaloy.SnapshotT(t, cs.Summary()) }) } + +func TestPrimaryKeyUpdate(t *testing.T) { + var tb, ctx = postgresTestBackend(t), context.Background() + var uniqueID = "63510878" + var tableDef = "(id INTEGER PRIMARY KEY, data TEXT)" + var tableName = tb.CreateTable(ctx, t, uniqueID, tableDef) + + var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(uniqueID)) + cs.Validator = &st.OrderedCaptureValidator{} + sqlcapture.TestShutdownAfterCaughtUp = true + t.Cleanup(func() { sqlcapture.TestShutdownAfterCaughtUp = false }) + + // Initial backfill + tb.Insert(ctx, t, tableName, [][]any{{0, "zero"}, {1, "one"}, {2, "two"}}) + cs.Capture(ctx, t, nil) + + // Some replication + tb.Insert(ctx, t, tableName, [][]any{{3, "three"}, {4, "four"}, {5, "five"}}) + cs.Capture(ctx, t, nil) + + // Primary key updates + tb.Update(ctx, t, tableName, "id", 1, "id", 6) + tb.Update(ctx, t, tableName, "id", 4, "id", 7) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) +} From 3d4257cea89fa80a134545d4104d8f4c36a5ea8d Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Thu, 5 Dec 2024 14:12:52 -0600 Subject: [PATCH 2/4] source-mysql: Add TestPrimaryKeyUpdate New test demonstrating current PK update behavior. --- source-mysql/.snapshots/TestPrimaryKeyUpdate | 16 ++++++++++++ source-mysql/capture_test.go | 27 ++++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 source-mysql/.snapshots/TestPrimaryKeyUpdate diff --git a/source-mysql/.snapshots/TestPrimaryKeyUpdate b/source-mysql/.snapshots/TestPrimaryKeyUpdate new file mode 100644 index 0000000000..07b557a8af --- /dev/null +++ b/source-mysql/.snapshots/TestPrimaryKeyUpdate @@ -0,0 +1,16 @@ +# ================================ +# Collection "acmeCo/test/test_primarykeyupdate_63510878": 8 Documents +# ================================ +{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:0"}},"data":"zero","id":0} +{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:1"}},"data":"one","id":1} +{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:2"}},"data":"two","id":2} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"three","id":3} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"five","id":5} +{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"one","id":1}},"data":"one","id":6} +{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"four","id":4}},"data":"four","id":7} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"test%2FPrimaryKeyUpdate_63510878":{"backfilled":3,"key_columns":["id"],"metadata":{"charset":"utf8mb4","schema":{"columns":["id","data"],"types":{"data":{"charset":"utf8mb4","type":"text"},"id":{"type":"int"}}}},"mode":"Active"}},"cursor":"binlog.000123:56789"} + diff --git a/source-mysql/capture_test.go b/source-mysql/capture_test.go index beea1ce4ee..95a00d989d 100644 --- a/source-mysql/capture_test.go +++ b/source-mysql/capture_test.go @@ -509,3 +509,30 @@ func TestDroppedAndRecreatedTable(t *testing.T) { cupaloy.SnapshotT(t, cs.Summary()) } + +func TestPrimaryKeyUpdate(t *testing.T) { + var tb, ctx = mysqlTestBackend(t), context.Background() + var uniqueID = "63510878" + var tableDef = "(id INTEGER PRIMARY KEY, data TEXT)" + var tableName = tb.CreateTable(ctx, t, uniqueID, tableDef) + + var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(uniqueID)) + cs.Validator = &st.OrderedCaptureValidator{} + sqlcapture.TestShutdownAfterCaughtUp = true + t.Cleanup(func() { sqlcapture.TestShutdownAfterCaughtUp = false }) + + // Initial backfill + tb.Insert(ctx, t, tableName, [][]any{{0, "zero"}, {1, "one"}, {2, "two"}}) + cs.Capture(ctx, t, nil) + + // Some replication + tb.Insert(ctx, t, tableName, [][]any{{3, "three"}, {4, "four"}, {5, "five"}}) + cs.Capture(ctx, t, nil) + + // Primary key updates + tb.Update(ctx, t, tableName, "id", 1, "id", 6) + tb.Update(ctx, t, tableName, "id", 4, "id", 7) + cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) +} From 294eeebf23b1e58e312f99000e5a3a56cf675252 Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Thu, 5 Dec 2024 14:14:39 -0600 Subject: [PATCH 3/4] source-sqlserver: Add TestPrimaryKeyUpdate New test demonstrating current PK update behavior. Unlike Postgres and MySQL, the SQL Server behavior is already the more-correct one where it issues a separate delete and create event. --- .../.snapshots/TestPrimaryKeyUpdate | 18 ++++++++++++ source-sqlserver/main_test.go | 28 +++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 source-sqlserver/.snapshots/TestPrimaryKeyUpdate diff --git a/source-sqlserver/.snapshots/TestPrimaryKeyUpdate b/source-sqlserver/.snapshots/TestPrimaryKeyUpdate new file mode 100644 index 0000000000..9421b153ba --- /dev/null +++ b/source-sqlserver/.snapshots/TestPrimaryKeyUpdate @@ -0,0 +1,18 @@ +# ================================ +# Collection "acmeCo/test/test_primarykeyupdate_63510878": 10 Documents +# ================================ +{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_PrimaryKeyUpdate_63510878","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"zero","id":0} +{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_PrimaryKeyUpdate_63510878","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"one","id":1} +{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_PrimaryKeyUpdate_63510878","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"two","id":2} +{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"three","id":3} +{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"four","id":4} +{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"five","id":5} +{"_meta":{"op":"d","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"id":1} +{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"one","id":6} +{"_meta":{"op":"d","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"id":4} +{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_PrimaryKeyUpdate_63510878","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"four","id":7} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"dbo%2Ftest_PrimaryKeyUpdate_63510878":{"backfilled":3,"key_columns":["id"],"mode":"Active"}},"cursor":"AAAAAAAAAAAAAA=="} + diff --git a/source-sqlserver/main_test.go b/source-sqlserver/main_test.go index 88239a7152..9de0465fc9 100644 --- a/source-sqlserver/main_test.go +++ b/source-sqlserver/main_test.go @@ -564,5 +564,33 @@ func TestFilegroupAndRole(t *testing.T) { cs.Capture(ctx, t, nil) tb.Insert(ctx, t, tableName, [][]any{{3, "three"}, {4, "four"}, {5, "five"}}) cs.Capture(ctx, t, nil) + + cupaloy.SnapshotT(t, cs.Summary()) +} + +func TestPrimaryKeyUpdate(t *testing.T) { + var tb, ctx = sqlserverTestBackend(t), context.Background() + var uniqueID = "63510878" + var tableDef = "(id INTEGER PRIMARY KEY, data TEXT)" + var tableName = tb.CreateTable(ctx, t, uniqueID, tableDef) + + var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(uniqueID)) + cs.Validator = &st.OrderedCaptureValidator{} + sqlcapture.TestShutdownAfterCaughtUp = true + t.Cleanup(func() { sqlcapture.TestShutdownAfterCaughtUp = false }) + + // Initial backfill + tb.Insert(ctx, t, tableName, [][]any{{0, "zero"}, {1, "one"}, {2, "two"}}) + cs.Capture(ctx, t, nil) + + // Some replication + tb.Insert(ctx, t, tableName, [][]any{{3, "three"}, {4, "four"}, {5, "five"}}) + cs.Capture(ctx, t, nil) + + // Primary key updates + tb.Update(ctx, t, tableName, "id", 1, "id", 6) + tb.Update(ctx, t, tableName, "id", 4, "id", 7) + cs.Capture(ctx, t, nil) + cupaloy.SnapshotT(t, cs.Summary()) } From 291023bc2828cbf99ea277ef5e4d430712661785 Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Thu, 5 Dec 2024 14:37:58 -0600 Subject: [PATCH 4/4] source-mysql: Implicit delete on PK updates This is fairly straightforward: if the primary key is changed by a CDC update event, then instead of an update document we should capture a synthetic deletion of the old row-state and an insert of the new row-state. Interestingly, since we base the implicit-delete decision on the serialized row key, which is based on the configured key columns of a binding, which typically come from the collection spec, this should work correctly even if the output collection key differs from the source DB primary key. But I haven't tested that, I was just thinking of the obvious weird cases and I think that's accurate. --- source-mysql/.snapshots/TestPrimaryKeyUpdate | 8 ++- source-mysql/main_test.go | 4 +- source-mysql/replication.go | 71 +++++++++++++++----- 3 files changed, 63 insertions(+), 20 deletions(-) diff --git a/source-mysql/.snapshots/TestPrimaryKeyUpdate b/source-mysql/.snapshots/TestPrimaryKeyUpdate index 07b557a8af..353d857565 100644 --- a/source-mysql/.snapshots/TestPrimaryKeyUpdate +++ b/source-mysql/.snapshots/TestPrimaryKeyUpdate @@ -1,5 +1,5 @@ # ================================ -# Collection "acmeCo/test/test_primarykeyupdate_63510878": 8 Documents +# Collection "acmeCo/test/test_primarykeyupdate_63510878": 10 Documents # ================================ {"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:0"}},"data":"zero","id":0} {"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"PrimaryKeyUpdate_63510878","cursor":"backfill:1"}},"data":"one","id":1} @@ -7,8 +7,10 @@ {"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"three","id":3} {"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4} {"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"five","id":5} -{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"one","id":1}},"data":"one","id":6} -{"_meta":{"op":"u","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"},"before":{"data":"four","id":4}},"data":"four","id":7} +{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":1} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"one","id":6} +{"_meta":{"op":"d","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":4} +{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"PrimaryKeyUpdate_63510878","cursor":"binlog.000123:56789:123","txid":"11111111-1111-1111-1111-111111111111:111"}},"data":"four","id":7} # ================================ # Final State Checkpoint # ================================ diff --git a/source-mysql/main_test.go b/source-mysql/main_test.go index 8bcfb4157a..c0af27db82 100644 --- a/source-mysql/main_test.go +++ b/source-mysql/main_test.go @@ -115,8 +115,8 @@ func (tb *testBackend) lowerTuningParameters(t testing.TB) { func (tb *testBackend) CaptureSpec(ctx context.Context, t testing.TB, streamMatchers ...*regexp.Regexp) *st.CaptureSpec { var sanitizers = make(map[string]*regexp.Regexp) sanitizers[`""`] = regexp.MustCompile(`"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|-[0-9]+:[0-9]+)"`) - sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":".+\.[0-9]+:[0-9]+:[0-9]+"`) - sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":".+\.[0-9]+:[0-9]+"`) + sanitizers[`"cursor":"binlog.000123:56789:123"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+:[0-9]+"`) + sanitizers[`"cursor":"binlog.000123:56789"`] = regexp.MustCompile(`"cursor":"[^"]+\.[0-9]+:[0-9]+"`) sanitizers[`"ts_ms":1111111111111`] = regexp.MustCompile(`"ts_ms":[0-9]+`) sanitizers[`"txid":"11111111-1111-1111-1111-111111111111:111"`] = regexp.MustCompile(`"txid":"[0-9a-f-]+:[0-9]+"`) diff --git a/source-mysql/replication.go b/source-mysql/replication.go index 824608d7cb..244cc0259f 100644 --- a/source-mysql/replication.go +++ b/source-mysql/replication.go @@ -375,9 +375,13 @@ func (rs *mysqlReplicationStream) run(ctx context.Context, startCursor mysql.Pos return fmt.Errorf("error decoding row values: %w", err) } after = mergePreimage(after, before) - rowKey, err := sqlcapture.EncodeRowKey(keyColumns, after, columnTypes, encodeKeyFDB) + rowKeyBefore, err := sqlcapture.EncodeRowKey(keyColumns, before, columnTypes, encodeKeyFDB) if err != nil { - return fmt.Errorf("error encoding row key for %q: %w", streamID, err) + return fmt.Errorf("error encoding 'before' row key for %q: %w", streamID, err) + } + rowKeyAfter, err := sqlcapture.EncodeRowKey(keyColumns, after, columnTypes, encodeKeyFDB) + if err != nil { + return fmt.Errorf("error encoding 'after' row key for %q: %w", streamID, err) } if err := rs.db.translateRecordFields(false, columnTypes, before); err != nil { return fmt.Errorf("error translating 'before' of %q UpdateOp: %w", streamID, err) @@ -385,21 +389,58 @@ func (rs *mysqlReplicationStream) run(ctx context.Context, startCursor mysql.Pos if err := rs.db.translateRecordFields(false, columnTypes, after); err != nil { return fmt.Errorf("error translating 'after' of %q UpdateOp: %w", streamID, err) } - var sourceInfo = &mysqlSourceInfo{ - SourceCommon: sourceCommon, - EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx/2), - } + + var events []sqlcapture.DatabaseEvent + var eventTxID string if rs.db.includeTxIDs[streamID] { - sourceInfo.TxID = rs.gtidString + eventTxID = rs.gtidString + } + if !bytes.Equal(rowKeyBefore, rowKeyAfter) { + // When the row key is changed by an update, translate it into a synthetic pair: a delete + // event of the old row-state, plus an insert event of the new row-state. + events = append(events, &sqlcapture.ChangeEvent{ + Operation: sqlcapture.DeleteOp, + RowKey: rowKeyBefore, + Before: before, + Source: &mysqlSourceInfo{ + SourceCommon: sourceCommon, + // Since updates consist of paired row-states (before, then after) which we iterate + // over two-at-a-time, it is consistent to have the row-index portion of the event + // cursor be the before-state index for this deletion and the after-state index for + // the insert. + EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx-1), + TxID: eventTxID, + }, + }, &sqlcapture.ChangeEvent{ + Operation: sqlcapture.InsertOp, + RowKey: rowKeyAfter, + After: after, + Source: &mysqlSourceInfo{ + SourceCommon: sourceCommon, + EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx), + TxID: eventTxID, + }, + }) + } else { + events = append(events, &sqlcapture.ChangeEvent{ + Operation: sqlcapture.UpdateOp, + RowKey: rowKeyAfter, + Before: before, + After: after, + Source: &mysqlSourceInfo{ + SourceCommon: sourceCommon, + // For updates the row-index part of the event cursor has to increment by two here + // so that there's room for synthetic delete/insert pairs. Since this value really + // just needs to be unique and properly ordered this is fine. + EventCursor: fmt.Sprintf("%s:%d:%d", cursor.Name, binlogEstimatedOffset, rowIdx), + TxID: eventTxID, + }, + }) } - if err := rs.emitEvent(ctx, &sqlcapture.ChangeEvent{ - Operation: sqlcapture.UpdateOp, - RowKey: rowKey, - Before: before, - After: after, - Source: sourceInfo, - }); err != nil { - return err + for _, event := range events { + if err := rs.emitEvent(ctx, event); err != nil { + return err + } } rs.uncommittedChanges++ // Keep a count of all uncommitted changes if nonTransactionalTable {