Skip to content

Commit

Permalink
script: Demonstrate custom DLQ behavior
Browse files Browse the repository at this point in the history
This change adds a userscript demo which shows how individual mutations can be
rerouted by a user-defined apply function to a DLQ table.
  • Loading branch information
bobvawter committed Sep 20, 2024
1 parent b7f9e98 commit a608103
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 1 deletion.
64 changes: 63 additions & 1 deletion internal/script/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ CREATE TABLE %s.skewed_merge_times(

r.NoError(s.watcher.Refresh(ctx, fixture.TargetPool))
a.Equal(4, s.Sources.Len())
a.Equal(10, s.Targets.Len())
a.Equal(11, s.Targets.Len())
a.Equal(map[string]string{"hello": "world"}, opts.data)

tbl1 := ident.NewTable(schema, ident.New("table1"))
Expand Down Expand Up @@ -494,6 +494,68 @@ CREATE TABLE %s.skewed_merge_times(
))
}
})

t.Run("uniquer_dlq", func(t *testing.T) {
if fixture.TargetPool.Product != types.ProductCockroachDB {
t.Skip("user script has opinionated SQL schema")
}
r := require.New(t)

_, err = fixture.TargetPool.ExecContext(ctx,
fmt.Sprintf("CREATE TABLE %s.uniquer(pk INT PRIMARY KEY, val INT UNIQUE)", schema))
r.NoError(err)
_, err = fixture.TargetPool.ExecContext(ctx,
fmt.Sprintf("CREATE TABLE %s.uniquer_dlq("+
"id UUID PRIMARY KEY DEFAULT gen_random_uuid(), "+
"created_at TIMESTAMPTZ NOT NULL DEFAULT now(), "+
"data JSONB NOT NULL)", schema))
r.NoError(err)

r.NoError(fixture.Watcher.Refresh(ctx, fixture.TargetPool))

table = ident.NewTable(schema, ident.New("uniquer"))
cfg := s.Targets.GetZero(table)
r.NotNil(cfg)
acceptor := cfg.UserAcceptor
r.NotNil(acceptor)

tx, err := fixture.TargetPool.BeginTx(ctx, nil)
r.NoError(err)
defer func() { _ = tx.Rollback() }()

r.NoError(acceptor.AcceptTableBatch(ctx, sinktest.TableBatchOf(
table, hlc.New(100, 1), []types.Mutation{
{
Data: json.RawMessage(`{"pk":1,"val":1}`),
Key: json.RawMessage(`[1]`),
},
},
), &types.AcceptOptions{TargetQuerier: tx}))

r.NoError(acceptor.AcceptTableBatch(ctx, sinktest.TableBatchOf(
table, hlc.New(100, 1), []types.Mutation{
// This mutation has val: 1 again.
{
Data: json.RawMessage(`{"pk":2,"val":1}`),
Key: json.RawMessage(`[2]`),
},
// This mutation should proceed.
{
Data: json.RawMessage(`{"pk":3,"val":2}`),
Key: json.RawMessage(`[3]`),
},
},
), &types.AcceptOptions{TargetQuerier: tx}))

r.NoError(tx.Commit())

var readBack string
r.NoError(fixture.TargetPool.QueryRowContext(ctx,
fmt.Sprintf("SELECT data FROM %s",
ident.NewTable(schema, ident.New("uniquer_dlq")))).Scan(&readBack))
r.Equal(`{"action": "upsert", "before": null, `+
`"data": {"pk": "2", "val": "1"}, "meta": null, "pk": ["2"]}`, readBack)
})
}

// This test ensures that the old name continues to function.
Expand Down
44 changes: 44 additions & 0 deletions internal/script/testdata/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,50 @@ api.configureTable("soft_deletes", {
}
})

// Demonstrate how a custom apply function can be used to reroute
// mutations that could be applied in replication scenarios that
// aren't a direct one-to-one copy of the data.
api.configureTable("uniquer", {
apply: async (ops: ApplyOp[]): Promise<any> => {
let tx = api.getTX();

// Create a savepoint, so we can bring the transaction out of a
// potential error condition.
await tx.exec("SAVEPOINT yolo");
try {
// In the general case, the operations can be applied
// in bulk. If this succeeds, we can just return here.
await tx.apply(ops);
return;
} catch (e) {
console.log(e.toString())
// Unique value violation.
if (!e.toString().includes("SQLSTATE 23505")) {
throw e;
}
}

// Bring the transaction out of the error state.
await tx.exec("ROLLBACK TO SAVEPOINT yolo");

// The whole batch failed, we'll try individual ones.
for (let op of ops) {
await tx.exec("SAVEPOINT yolo");
try {
await tx.apply([op])
continue;
} catch (e) {
if (!e.toString().includes("SQLSTATE 23505")) {
throw e;
}
}
// Found a bad message, write it to the dlq table.
await tx.exec("ROLLBACK TO SAVEPOINT yolo");
await tx.exec(`INSERT INTO ${tx.schema()}.uniquer_dlq(data) VALUES ($1)`, JSON.stringify(op));
}
}
});

// Demonstrate how upsert and delete SQL operations can be entirely
// overridden by the userscript. In this test, we perform some basic
// arithmetic on the keys and values to validate that this script is
Expand Down

0 comments on commit a608103

Please sign in to comment.