diff --git a/internal/sinktest/changefeed.go b/internal/sinktest/changefeed.go new file mode 100644 index 00000000..b9361bd2 --- /dev/null +++ b/internal/sinktest/changefeed.go @@ -0,0 +1,118 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package sinktest + +import ( + "fmt" + "net/url" + "strings" + + "github.com/cockroachdb/replicator/internal/util/ident" + "github.com/cockroachdb/replicator/internal/util/stdpool" +) + +// ChangefeedStatement is a struct that represents a changefeed statement and it +// allows the caller to specify various configuration options and parameters +// useful for creating changefeeds on CRDB sources. +type ChangefeedStatement struct { + Diff bool + Host string + KeyInValue bool + QueryProjectionColumns []ident.Ident + SourceVersion string + Tables []ident.Table + Target ident.Table + Token string + Webhook bool +} + +// String returns a string representation of the changefeed statement. +func (cfs *ChangefeedStatement) String() string { + params := make(url.Values) + var feedURL url.URL + var pathIdent ident.Identifier + createStmt := "CREATE CHANGEFEED" + + if len(cfs.QueryProjectionColumns) > 0 { + pathIdent = cfs.Target + } else { + // Creating the comma-separated table string required by the changefeed. + tablesStr := "" + for i, table := range cfs.Tables { + if i > 0 { + tablesStr += ", " + } + tablesStr += table.String() + } + pathIdent = cfs.Target.Schema() + createStmt += fmt.Sprintf(" FOR TABLE %s", tablesStr) + } + + if cfs.Webhook { + params.Set("insecure_tls_skip_verify", "true") + feedURL = url.URL{ + Scheme: "webhook-https", + Host: cfs.Host, + Path: ident.Join(pathIdent, ident.Raw, '/'), + RawQuery: params.Encode(), + } + createStmt += " INTO '" + feedURL.String() + "' " + + " WITH updated," + + " resolved='1s'," + + " webhook_auth_header='Bearer " + cfs.Token + "'" + } else { + params.Set("access_token", cfs.Token) + feedURL = url.URL{ + Scheme: "experimental-http", + Host: cfs.Host, + Path: ident.Join(pathIdent, ident.Raw, '/'), + RawQuery: params.Encode(), + } + createStmt += " INTO '" + feedURL.String() + "' " + + "WITH updated, resolved='1s'" + } + + if cfs.Diff { + createStmt += ", diff" + } + + if cfs.KeyInValue { + createStmt += ", key_in_value" + } + + if ok, err := SupportsMinCheckpointFrequency(cfs.SourceVersion); err == nil && ok { + createStmt += ", min_checkpoint_frequency='1s'" + } + + if len(cfs.QueryProjectionColumns) > 0 { + createStmt += ",envelope='wrapped',format='json'" + var columns []string + for _, col := range cfs.QueryProjectionColumns { + columns = append(columns, col.String()) + } + createStmt += " AS SELECT " + strings.Join(columns, ", ") + createStmt += fmt.Sprintf(" FROM %s", cfs.Tables[0].String()) + } + + return createStmt +} + +// SupportsMinCheckpointFrequency checks if a certain version of CRDB supports the +// min_checkpoint_frequency option. +func SupportsMinCheckpointFrequency(version string) (bool, error) { + return stdpool.CockroachMinVersion(version, "v22.1") +} diff --git a/internal/sinktest/changefeed_test.go b/internal/sinktest/changefeed_test.go new file mode 100644 index 00000000..5f8271f4 --- /dev/null +++ b/internal/sinktest/changefeed_test.go @@ -0,0 +1,121 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package sinktest + +import ( + "testing" + + "github.com/cockroachdb/replicator/internal/util/ident" + "github.com/stretchr/testify/require" +) + +// Note that the `sourceVersion` must contain the platform in the semver regex +// (i.e. "CockroachDB CCL v24.2.1 (platform)"). +func TestCreateChangefeedStatement(t *testing.T) { + tests := []struct { + name string + stmt ChangefeedStatement + want string + }{ + { + name: "basic no changefeed configs", + stmt: ChangefeedStatement{ + Host: "localHost:8080", + SourceVersion: "CockroachDB CCL v24.2.1 (platform)", + Target: ident.NewTable(ident.MustSchema(ident.New("target"), + ident.New("public")), ident.New("tbl1")), + Token: "my_token", + Tables: []ident.Table{ident.NewTable(ident.MustSchema(ident.New("source"), + ident.New("public")), ident.New("tbl1"))}, + }, + want: `CREATE CHANGEFEED FOR TABLE "source"."public"."tbl1" INTO ` + + `'experimental-http://localHost:8080/target/public?access_token=my_token' ` + + `WITH updated, resolved='1s', min_checkpoint_frequency='1s'`, + }, + { + name: "basic webhook", + stmt: ChangefeedStatement{ + Host: "localHost:8080", + SourceVersion: "CockroachDB CCL v24.2.1 (platform)", + Target: ident.NewTable(ident.MustSchema(ident.New("target"), ident.New("public")), + ident.New("tbl1")), + Token: "my_token", + Tables: []ident.Table{ + ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), + ident.New("tbl1")), + ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), + ident.New("tbl2")), + }, + Webhook: true, + }, + want: `CREATE CHANGEFEED FOR TABLE "source"."public"."tbl1", "source"."public"."tbl2" ` + + `INTO 'webhook-https://localHost:8080/target/public?insecure_tls_skip_verify=true' ` + + `WITH updated, resolved='1s', webhook_auth_header='Bearer my_token', ` + + `min_checkpoint_frequency='1s'`, + }, + { + name: "webhook and diff and key_in_value enabled", + stmt: ChangefeedStatement{ + Diff: true, + Host: "localHost:8080", + KeyInValue: true, + SourceVersion: "CockroachDB CCL v24.2.1 (platform)", + Target: ident.NewTable(ident.MustSchema(ident.New("target"), ident.New("public")), + ident.New("tbl1")), + Token: "my_token", + Tables: []ident.Table{ + ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), + ident.New("tbl1")), + ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), + ident.New("tbl2")), + }, + Webhook: true, + }, + want: `CREATE CHANGEFEED FOR TABLE "source"."public"."tbl1", "source"."public"."tbl2" ` + + `INTO 'webhook-https://localHost:8080/target/public?insecure_tls_skip_verify=true' ` + + `WITH updated, resolved='1s', webhook_auth_header='Bearer my_token', diff, key_in_value, ` + + `min_checkpoint_frequency='1s'`, + }, + { + name: "basic webhook CDC queries", + stmt: ChangefeedStatement{ + Host: "localHost:8080", + QueryProjectionColumns: []ident.Ident{ident.New("pk"), ident.New("val")}, + SourceVersion: "CockroachDB CCL v24.2.1 (platform)", + Target: ident.NewTable(ident.MustSchema(ident.New("target"), ident.New("public")), + ident.New("tbl1")), + Token: "my_token", + Tables: []ident.Table{ + ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")), + ident.New("tbl1")), + }, + Webhook: true, + }, + want: `CREATE CHANGEFEED INTO ` + + `'webhook-https://localHost:8080/target/public/tbl1?insecure_tls_skip_verify=true' ` + + `WITH updated, resolved='1s', webhook_auth_header='Bearer my_token', ` + + `min_checkpoint_frequency='1s',envelope='wrapped',format='json' AS SELECT "pk", "val" ` + + `FROM "source"."public"."tbl1"`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.stmt.String() + require.Equal(t, tt.want, got) + }) + } +} diff --git a/internal/source/cdc/server/integration_test.go b/internal/source/cdc/server/integration_test.go index 7710e6ae..73b5d491 100644 --- a/internal/source/cdc/server/integration_test.go +++ b/internal/source/cdc/server/integration_test.go @@ -18,6 +18,7 @@ package server import ( "crypto/tls" + "database/sql" "fmt" "io" "net/http" @@ -31,14 +32,18 @@ import ( "github.com/cockroachdb/replicator/internal/sequencer" stagingProd "github.com/cockroachdb/replicator/internal/sinkprod" "github.com/cockroachdb/replicator/internal/sinktest" + "github.com/cockroachdb/replicator/internal/sinktest/all" "github.com/cockroachdb/replicator/internal/sinktest/base" "github.com/cockroachdb/replicator/internal/source/cdc" + "github.com/cockroachdb/replicator/internal/types" jwtAuth "github.com/cockroachdb/replicator/internal/util/auth/jwt" "github.com/cockroachdb/replicator/internal/util/diag" + "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/cockroachdb/replicator/internal/util/ident" "github.com/cockroachdb/replicator/internal/util/stdlogical" "github.com/cockroachdb/replicator/internal/util/stdpool" "github.com/cockroachdb/replicator/internal/util/stdserver" + "github.com/cockroachdb/replicator/internal/util/workload" joonix "github.com/joonix/log" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -300,7 +305,7 @@ func testIntegration(t *testing.T, cfg testConfig) { // Don't wait the entire 30s. This options was introduced in the // same versions as webhooks. - if ok, err := supportsMinCheckpoint(sourceVersion); a.NoError(err) && ok { + if ok, err := sinktest.SupportsMinCheckpointFrequency(sourceVersion); a.NoError(err) && ok { createStmt += ",min_checkpoint_frequency='1s'" } if cfg.queries { @@ -394,19 +399,283 @@ func testIntegration(t *testing.T, cfg testConfig) { }) } -// Necessary for faster resolved timestamps. -func supportsMinCheckpoint(version string) (bool, error) { - return stdpool.CockroachMinVersion(version, "v22.1") -} - // While queries are supported in v22.2, they were in preview. func supportsQueries(version string) (bool, error) { return stdpool.CockroachMinVersion(version, "v23.1") } +// Union recursive CTEs are only supported in v22.1 and later. +func supportsUnionRecursiveCTEs(version string) (bool, error) { + return stdpool.CockroachMinVersion(version, "v22.1") +} + // In older versions of CRDB, the webhook endpoint is not available so // no self-signed certificate is needed. This acts as a signal whether // the webhook endpoint is available. func supportsWebhook(version string) (bool, error) { return stdpool.CockroachMinVersion(version, "v21.2") } + +func getConfig( + cfg *testConfig, fixture *all.Fixture, targetPool *types.TargetPool, +) (*Config, error) { + fixtureCfg := &Config{ + CDC: cdc.Config{ + ConveyorConfig: conveyor.Config{ + Immediate: cfg.immediate, + // In the case that the "queries" configuration is enabled, the + // testWorkload will create two changefeeds that target the same + // target schema. Because of limitations on the CDC side right + // now, we don't differentiate between resolved timestamps from + // different streams. Therefore, the backwards data check + // should be disabled in the multiple changefeed case since we + // know the backwards data check will fail without disabling it. + // Links for context: + // https://github.com/cockroachdb/cockroach/issues/112880 + // https://github.com/cockroachdb/replicator/issues/574 + SkipBackwardsDataCheck: cfg.queries, + }, + SequencerConfig: sequencer.Config{ + RetireOffset: time.Hour, // Allow post-hoc inspection of staged data. + Parallelism: 1, + }, + }, + HTTP: stdserver.Config{ + BindAddr: "127.0.0.1:0", + GenerateSelfSigned: cfg.webhook, // Webhook implies self-signed TLS is ok. + }, + Staging: stagingProd.StagingConfig{ + CommonConfig: stagingProd.CommonConfig{ + Conn: fixture.StagingPool.ConnectionString, + MaxPoolSize: 16, + }, + Schema: fixture.StagingDB.Schema(), + }, + Target: stagingProd.TargetConfig{ + CommonConfig: stagingProd.CommonConfig{ + Conn: targetPool.ConnectionString, + MaxPoolSize: 16, + }, + }, + } + + // Preflight sets default values that are not set in the testConfig. + return fixtureCfg, fixtureCfg.Preflight() +} + +func TestWorkload(t *testing.T) { + tests := []struct { + name string + cfg *testConfig + }{ + {"webhook true diff true queries false", + &testConfig{webhook: true, diff: true, queries: false}}, + {"webhook true diff false queries false", + &testConfig{webhook: true, diff: false, queries: false}}, + // In order to emit all deletes in queries mode, the + // diff/queries/keyInValue must all be set together. + {"webhook true diff true queries true key in value true", + &testConfig{webhook: true, diff: true, queries: true, keyInValue: true}}, + {"webhook false diff true queries false", + &testConfig{webhook: false, diff: true, queries: false}}, + {"webhook false diff false queries false", + &testConfig{webhook: false, diff: false, queries: false}}, + // Same note as above for the additional options that must be set + // alongside queries == true. + {"webhook false diff true queries true key in value true", + &testConfig{webhook: false, diff: true, queries: true, keyInValue: true}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testWorkload(t, tt.cfg) + }) + } +} + +func testWorkload(t *testing.T, cfg *testConfig) { + t.Parallel() + const maxIterations = 25 + + log.SetLevel(log.DebugLevel) + r := require.New(t) + + // Create the target fixture, which will be used + // to determine if all the data was written to the target. + targetFixture, err := all.NewFixture(t) + r.NoError(err) + sourceVersion := targetFixture.SourcePool.Version + + // Union for recursive CTEs is required in order to use the schema + // inspection code required to set up the fixture. + supportsUnionRecursiveCTEs, err := supportsUnionRecursiveCTEs(sourceVersion) + r.NoError(err) + if !supportsUnionRecursiveCTEs { + t.Skipf("Union in recursive CTEs is not compatible with %s version of cockroach.", + sourceVersion) + } + + // The original source is from the target fixture. + supportsWebhook, err := supportsWebhook(sourceVersion) + r.NoError(err) + if cfg.webhook && !supportsWebhook { + t.Skipf("Webhook is not compatible with %s version of cockroach.", sourceVersion) + } + supportsQueries, err := supportsQueries(sourceVersion) + r.NoError(err) + if cfg.queries && !supportsQueries { + t.Skipf("CDC queries are not compatible with %s version of cockroach", sourceVersion) + } + + ctx := targetFixture.Context + targetChecker, _, err := targetFixture.NewWorkload(ctx, &all.WorkloadConfig{ + DisableFK: cfg.queries, + }) + r.NoError(err) + + sourceSchema := targetFixture.SourceSchema.Schema() + targetSchema := targetFixture.TargetSchema.Schema() + sourceGeneratorWorkload := workload.NewGeneratorBase( + ident.NewTable(sourceSchema, targetChecker.Parent.Name().Table()), + ident.NewTable(sourceSchema, targetChecker.Child.Name().Table()), + ) + + // Creates the tables on the source side, so that the same table + // names exist in both the source and the target, a requirement for + // replication here. + sourcePool := targetFixture.SourcePool + parent := sourceGeneratorWorkload.Parent + child := sourceGeneratorWorkload.Child + parentSQL, childSQL := all.WorkloadSchema( + &all.WorkloadConfig{}, sourcePool.Product, + parent, child) + _, err = sourcePool.ExecContext(ctx, parentSQL) + r.NoError(err) + _, err = sourcePool.ExecContext(ctx, childSQL) + r.NoError(err) + + // In order to ensure that the source fixture has knowledge of the new + // tables created on the source side, the source + // fixture must be created after those tables are created. + // Alternatively, if the source fixture must be created earlier, then + // after new tables are added, the source fixture must be refreshed. + // The source fixture can be refreshed by doing the following: + // sourceFixture.Watcher.Refresh(ctx, targetPool) + sourceFixture, err := all.NewFixtureFromBase(targetFixture.Swapped()) + r.NoError(err) + + // Get test configurations for the server. + serverCfg, err := getConfig(cfg, sourceFixture, targetFixture.TargetPool) + r.NoError(err) + + // Create the test server fixture. + connCtx := targetFixture.Context + testFixture, cancel, err := newTestFixture(connCtx, serverCfg) + defer cancel() + r.NoError(err) + stdlogical.AddHandlers(testFixture.Authenticator, testFixture.Server.GetServeMux(), testFixture.Diagnostics) + + // Insert a testing JWT key so we can properly talk to the webhook + // in an authenticated manner. + method, priv, err := jwtAuth.InsertTestingKey(ctx, targetFixture.StagingPool, + testFixture.Authenticator, targetFixture.StagingDB) + r.NoError(err) + + targetDB := targetSchema.Schema() + targetParent := ident.NewTable(targetDB, targetChecker.Parent.Name().Table()) + targetChild := ident.NewTable(targetDB, targetChecker.Child.Name().Table()) + _, token, err := jwtAuth.Sign(method, priv, []ident.Schema{targetParent.Schema(), diag.Schema}) + r.NoError(err) + + // Create the changefeed on the source CRDB. + tables := []ident.Table{sourceGeneratorWorkload.Parent, sourceGeneratorWorkload.Child} + host := testFixture.Listener.Addr().String() + + if cfg.queries { + createParentStmt := sinktest.ChangefeedStatement{ + Diff: cfg.diff, + Host: host, + QueryProjectionColumns: []ident.Ident{ident.New("parent"), ident.New("val")}, + SourceVersion: sourceVersion, + Tables: tables, + Target: targetParent, + Token: token, + Webhook: cfg.webhook, + } + createParentStmtStr := createParentStmt.String() + + createChildStmt := sinktest.ChangefeedStatement{ + Diff: cfg.diff, + Host: host, + QueryProjectionColumns: []ident.Ident{ident.New("parent"), ident.New("child"), ident.New("val")}, + SourceVersion: sourceVersion, + Tables: []ident.Table{tables[1]}, + Target: targetChild, + Token: token, + Webhook: cfg.webhook, + } + createChildStmtStr := createChildStmt.String() + + log.Debugf("create parent changefeed statement is %s", createParentStmtStr) + _, err = sourcePool.ExecContext(ctx, createParentStmtStr) + r.NoError(err) + log.Debugf("create child changefeed statement is %s", createChildStmtStr) + _, err = sourcePool.ExecContext(ctx, createChildStmtStr) + r.NoError(err) + } else { + createStmt := sinktest.ChangefeedStatement{ + Diff: cfg.diff, + Host: host, + SourceVersion: sourceVersion, + Tables: tables, + Target: targetParent, + Token: token, + Webhook: cfg.webhook, + } + createStmtStr := createStmt.String() + log.Debugf("create changefeed statement is %s", createStmtStr) + _, err = sourcePool.ExecContext(ctx, createStmtStr) + r.NoError(err) + } + + acc := types.OrderedAcceptorFrom(sourceFixture.ApplyAcceptor, sourceFixture.Watchers) + + for i := range maxIterations { + batch := &types.MultiBatch{} + sourceGeneratorWorkload.GenerateInto(batch, hlc.New(int64(i), i+1)) + + // Insert data on the source since it will flow from changefeeds + // to the staging DB and then to the target. + tx, err := sourceFixture.TargetPool.BeginTx(ctx, &sql.TxOptions{}) + r.NoError(err) + r.NoError(acc.AcceptMultiBatch(ctx, batch, &types.AcceptOptions{TargetQuerier: tx})) + r.NoError(tx.Commit()) + } + + // Merge the generator values into the target checker. + // This makes it so that the target checker has all the expected + // data from the source generator workload. + targetChecker.CopyFrom(sourceGeneratorWorkload) + + // Adapted this polling logic from the above test. + // This is a simpler way to determine if the rows + // were backfilled on the target. + for { + parentCt, err := base.GetRowCount(ctx, targetFixture.TargetPool, targetParent) + r.NoError(err) + childCt, err := base.GetRowCount(ctx, targetFixture.TargetPool, targetChild) + r.NoError(err) + if parentCt >= len(targetChecker.ParentRows()) && childCt >= len(targetChecker.ChildRows()) { + break + } + log.Debug("waiting for target rows to be written") + time.Sleep(time.Second) + } + + r.True(targetChecker.CheckConsistent(ctx, t)) + + // We need to wait for the connection to shut down + // so that there is no dangling state from the test. + connCtx.Stop(time.Minute) + <-connCtx.Done() +} diff --git a/internal/source/cdc/server/test_fixture.go b/internal/source/cdc/server/test_fixture.go index 0007df82..1e168eb4 100644 --- a/internal/source/cdc/server/test_fixture.go +++ b/internal/source/cdc/server/test_fixture.go @@ -24,6 +24,7 @@ import ( "net" "github.com/cockroachdb/field-eng-powertools/stopper" + "github.com/cockroachdb/replicator/internal/source/cdc" "github.com/cockroachdb/replicator/internal/types" "github.com/cockroachdb/replicator/internal/util/diag" "github.com/cockroachdb/replicator/internal/util/ident" @@ -35,6 +36,7 @@ type testFixture struct { Authenticator types.Authenticator Config *Config Diagnostics *diag.Diagnostics + Handler *cdc.Handler Listener net.Listener Memo types.Memo StagingPool *types.StagingPool diff --git a/internal/source/cdc/server/wire_gen.go b/internal/source/cdc/server/wire_gen.go index 9fbaddb6..6a0ae16b 100644 --- a/internal/source/cdc/server/wire_gen.go +++ b/internal/source/cdc/server/wire_gen.go @@ -176,15 +176,11 @@ func newTestFixture(context *stopper.Context, config *Config) (*testFixture, fun if err != nil { return nil, nil, err } - listener, err := ProvideListener(context, config, diagnostics) - if err != nil { - return nil, nil, err - } + cdcConfig := &config.CDC memoMemo, err := memo.ProvideMemo(context, stagingPool, stagingSchema) if err != nil { return nil, nil, err } - cdcConfig := &config.CDC checker := version.ProvideChecker(stagingPool, memoMemo) targetPool, err := sinkprod.ProvideTargetPool(context, checker, targetConfig, diagnostics) if err != nil { @@ -253,6 +249,10 @@ func newTestFixture(context *stopper.Context, config *Config) (*testFixture, fun if err != nil { return nil, nil, err } + listener, err := ProvideListener(context, config, diagnostics) + if err != nil { + return nil, nil, err + } serveMux := ProvideMux(config, handler, stagingPool, targetPool) tlsConfig, err := ProvideTLSConfig(config) if err != nil { @@ -263,6 +263,7 @@ func newTestFixture(context *stopper.Context, config *Config) (*testFixture, fun Authenticator: authenticator, Config: config, Diagnostics: diagnostics, + Handler: handler, Listener: listener, Memo: memoMemo, StagingPool: stagingPool, @@ -287,6 +288,7 @@ type testFixture struct { Authenticator types.Authenticator Config *Config Diagnostics *diag.Diagnostics + Handler *cdc.Handler Listener net.Listener Memo types.Memo StagingPool *types.StagingPool