diff --git a/internal/script/script_test.go b/internal/script/script_test.go index c94174ebf..8fbc89ed4 100644 --- a/internal/script/script_test.go +++ b/internal/script/script_test.go @@ -112,7 +112,7 @@ CREATE TABLE %s.skewed_merge_times( r.NoError(s.watcher.Refresh(ctx, fixture.TargetPool)) a.Equal(4, s.Sources.Len()) - a.Equal(10, s.Targets.Len()) + a.Equal(11, s.Targets.Len()) a.Equal(map[string]string{"hello": "world"}, opts.data) tbl1 := ident.NewTable(schema, ident.New("table1")) @@ -494,6 +494,68 @@ CREATE TABLE %s.skewed_merge_times( )) } }) + + t.Run("uniquer_dlq", func(t *testing.T) { + if fixture.TargetPool.Product != types.ProductCockroachDB { + t.Skip("user script has opinionated SQL schema") + } + r := require.New(t) + + _, err = fixture.TargetPool.ExecContext(ctx, + fmt.Sprintf("CREATE TABLE %s.uniquer(pk INT PRIMARY KEY, val INT UNIQUE)", schema)) + r.NoError(err) + _, err = fixture.TargetPool.ExecContext(ctx, + fmt.Sprintf("CREATE TABLE %s.uniquer_dlq("+ + "id UUID PRIMARY KEY DEFAULT gen_random_uuid(), "+ + "created_at TIMESTAMPTZ NOT NULL DEFAULT now(), "+ + "data JSONB NOT NULL)", schema)) + r.NoError(err) + + r.NoError(fixture.Watcher.Refresh(ctx, fixture.TargetPool)) + + table = ident.NewTable(schema, ident.New("uniquer")) + cfg := s.Targets.GetZero(table) + r.NotNil(cfg) + acceptor := cfg.UserAcceptor + r.NotNil(acceptor) + + tx, err := fixture.TargetPool.BeginTx(ctx, nil) + r.NoError(err) + defer func() { _ = tx.Rollback() }() + + r.NoError(acceptor.AcceptTableBatch(ctx, sinktest.TableBatchOf( + table, hlc.New(100, 1), []types.Mutation{ + { + Data: json.RawMessage(`{"pk":1,"val":1}`), + Key: json.RawMessage(`[1]`), + }, + }, + ), &types.AcceptOptions{TargetQuerier: tx})) + + r.NoError(acceptor.AcceptTableBatch(ctx, sinktest.TableBatchOf( + table, hlc.New(100, 1), []types.Mutation{ + // This mutation has val: 1 again. + { + Data: json.RawMessage(`{"pk":2,"val":1}`), + Key: json.RawMessage(`[2]`), + }, + // This mutation should proceed. + { + Data: json.RawMessage(`{"pk":3,"val":2}`), + Key: json.RawMessage(`[3]`), + }, + }, + ), &types.AcceptOptions{TargetQuerier: tx})) + + r.NoError(tx.Commit()) + + var readBack string + r.NoError(fixture.TargetPool.QueryRowContext(ctx, + fmt.Sprintf("SELECT data FROM %s", + ident.NewTable(schema, ident.New("uniquer_dlq")))).Scan(&readBack)) + r.Equal(`{"action": "upsert", "before": null, `+ + `"data": {"pk": "2", "val": "1"}, "meta": null, "pk": ["2"]}`, readBack) + }) } // This test ensures that the old name continues to function. diff --git a/internal/script/testdata/main.ts b/internal/script/testdata/main.ts index a0b941d4b..2526a112b 100644 --- a/internal/script/testdata/main.ts +++ b/internal/script/testdata/main.ts @@ -239,6 +239,50 @@ api.configureTable("soft_deletes", { } }) +// Demonstrate how a custom apply function can be used to reroute +// mutations that could be applied in replication scenarios that +// aren't a direct one-to-one copy of the data. +api.configureTable("uniquer", { + apply: async (ops: ApplyOp[]): Promise => { + let tx = api.getTX(); + + // Create a savepoint, so we can bring the transaction out of a + // potential error condition. + await tx.exec("SAVEPOINT yolo"); + try { + // In the general case, the operations can be applied + // in bulk. If this succeeds, we can just return here. + await tx.apply(ops); + return; + } catch (e) { + console.log(e.toString()) + // Unique value violation. + if (!e.toString().includes("SQLSTATE 23505")) { + throw e; + } + } + + // Bring the transaction out of the error state. + await tx.exec("ROLLBACK TO SAVEPOINT yolo"); + + // The whole batch failed, we'll try individual ones. + for (let op of ops) { + await tx.exec("SAVEPOINT yolo"); + try { + await tx.apply([op]) + continue; + } catch (e) { + if (!e.toString().includes("SQLSTATE 23505")) { + throw e; + } + } + // Found a bad message, write it to the dlq table. + await tx.exec("ROLLBACK TO SAVEPOINT yolo"); + await tx.exec(`INSERT INTO ${tx.schema()}.uniquer_dlq(data) VALUES ($1)`, JSON.stringify(op)); + } + } +}); + // Demonstrate how upsert and delete SQL operations can be entirely // overridden by the userscript. In this test, we perform some basic // arithmetic on the keys and values to validate that this script is