diff --git a/internal/source/cdc/server/integration_test.go b/internal/source/cdc/server/integration_test.go index adf1a1a5..1df4e199 100644 --- a/internal/source/cdc/server/integration_test.go +++ b/internal/source/cdc/server/integration_test.go @@ -545,86 +545,6 @@ func (cfs *ChangefeedStatement) String() string { return createStmt } -// CreateChangefeed creates a changefeed on the source CRDB side that is -// compatible with the replicator webhook. It is an intentional decision to make -// the input parameters as specific as possible so that the caller doesn't need -// to construct whole structs with unrelated information to create the -// changefeed. This makes it so this is more portable for future work that needs -// this: for example, integrating the e2e workload checker with this logic. -func CreateChangefeedStatement( - cfg *ChangefeedConfig, - host string, - target ident.Table, - token string, - tables []ident.Table, - sourceVersion string, - queryProjectionColumns string, -) string { - params := make(url.Values) - var feedURL url.URL - var pathIdent ident.Identifier - createStmt := "CREATE CHANGEFEED" - if cfg.queries { - pathIdent = target - } else { - // Creating the comma separated table string - // that the changefeed requires. - tablesStr := "" - for i, table := range tables { - if i > 0 { - tablesStr += ", " - } - tablesStr += table.String() - } - pathIdent = target.Schema() - createStmt += fmt.Sprintf(" FOR TABLE %s", tablesStr) - } - if cfg.webhook { - params.Set("insecure_tls_skip_verify", "true") - feedURL = url.URL{ - Scheme: "webhook-https", - Host: host, - Path: ident.Join(pathIdent, ident.Raw, '/'), - RawQuery: params.Encode(), - } - createStmt += " INTO '" + feedURL.String() + "' " + - " WITH updated," + - " resolved='1s'," + - " webhook_auth_header='Bearer " + token + "'" - } else { - // No webhook_auth_header, so bake it into the query string. - // See comments in cdc.Handler.ServeHTTP checkAccess. - params.Set("access_token", token) - feedURL = url.URL{ - Scheme: "experimental-http", - Host: host, - Path: ident.Join(pathIdent, ident.Raw, '/'), - RawQuery: params.Encode(), - } - createStmt += " INTO '" + feedURL.String() + "' " + - "WITH updated, resolved='1s'" - } - if cfg.diff { - createStmt += ", diff" - } - // Don't wait the entire 30s. This option was introduced in the - // same versions as webhooks. - if ok, err := supportsMinCheckpoint(sourceVersion); err == nil && ok { - createStmt += ", min_checkpoint_frequency='1s'" - } - - if cfg.queries { - // CDC queries only support a single table at a time. - // Safe to just use the first table passed in. - createStmt += ",envelope='wrapped',format='json'" - createStmt += " AS SELECT " + queryProjectionColumns - createStmt += fmt.Sprintf(" FROM %s", tables[0].String()) - } - log.Debugf("changefeed URL is %s", feedURL.String()) - - return createStmt -} - // Note that the `sourceVersion` must contain a space at the end // to satisfy the semver regex. func TestCreateChangefeedStatement(t *testing.T) {