Skip to content

Commit

Permalink
removed cruft
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanluu12345 committed Oct 28, 2024
1 parent da63de1 commit c18b76a
Showing 1 changed file with 0 additions and 80 deletions.
80 changes: 0 additions & 80 deletions internal/source/cdc/server/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,86 +545,6 @@ func (cfs *ChangefeedStatement) String() string {
return createStmt
}

// CreateChangefeed creates a changefeed on the source CRDB side that is
// compatible with the replicator webhook. It is an intentional decision to make
// the input parameters as specific as possible so that the caller doesn't need
// to construct whole structs with unrelated information to create the
// changefeed. This makes it so this is more portable for future work that needs
// this: for example, integrating the e2e workload checker with this logic.
func CreateChangefeedStatement(
cfg *ChangefeedConfig,
host string,
target ident.Table,
token string,
tables []ident.Table,
sourceVersion string,
queryProjectionColumns string,
) string {
params := make(url.Values)
var feedURL url.URL
var pathIdent ident.Identifier
createStmt := "CREATE CHANGEFEED"
if cfg.queries {
pathIdent = target
} else {
// Creating the comma separated table string
// that the changefeed requires.
tablesStr := ""
for i, table := range tables {
if i > 0 {
tablesStr += ", "
}
tablesStr += table.String()
}
pathIdent = target.Schema()
createStmt += fmt.Sprintf(" FOR TABLE %s", tablesStr)
}
if cfg.webhook {
params.Set("insecure_tls_skip_verify", "true")
feedURL = url.URL{
Scheme: "webhook-https",
Host: host,
Path: ident.Join(pathIdent, ident.Raw, '/'),
RawQuery: params.Encode(),
}
createStmt += " INTO '" + feedURL.String() + "' " +
" WITH updated," +
" resolved='1s'," +
" webhook_auth_header='Bearer " + token + "'"
} else {
// No webhook_auth_header, so bake it into the query string.
// See comments in cdc.Handler.ServeHTTP checkAccess.
params.Set("access_token", token)
feedURL = url.URL{
Scheme: "experimental-http",
Host: host,
Path: ident.Join(pathIdent, ident.Raw, '/'),
RawQuery: params.Encode(),
}
createStmt += " INTO '" + feedURL.String() + "' " +
"WITH updated, resolved='1s'"
}
if cfg.diff {
createStmt += ", diff"
}
// Don't wait the entire 30s. This option was introduced in the
// same versions as webhooks.
if ok, err := supportsMinCheckpoint(sourceVersion); err == nil && ok {
createStmt += ", min_checkpoint_frequency='1s'"
}

if cfg.queries {
// CDC queries only support a single table at a time.
// Safe to just use the first table passed in.
createStmt += ",envelope='wrapped',format='json'"
createStmt += " AS SELECT " + queryProjectionColumns
createStmt += fmt.Sprintf(" FROM %s", tables[0].String())
}
log.Debugf("changefeed URL is %s", feedURL.String())

return createStmt
}

// Note that the `sourceVersion` must contain a space at the end
// to satisfy the semver regex.
func TestCreateChangefeedStatement(t *testing.T) {
Expand Down

0 comments on commit c18b76a

Please sign in to comment.