Skip to content

Commit

Permalink
refactored changefeed helpers and added the key_in_value option
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanluu12345 committed Oct 28, 2024
1 parent c18b76a commit 4097471
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 225 deletions.
104 changes: 104 additions & 0 deletions internal/sinktest/changefeed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package sinktest

import (
"fmt"
"net/url"

"github.com/cockroachdb/replicator/internal/util/ident"
"github.com/cockroachdb/replicator/internal/util/stdpool"
)

// ChangefeedConfig is meant to influence the behavior
// of the created changefeed statement depending
// on the options defined. This can be extended later on
// to handle various configurations and webhook parameters.
type ChangefeedConfig struct {
Webhook bool
Diff bool
Queries bool
KeyInValue bool
}

// ChangefeedStatement is a struct that represents a changefeed statement and it
// allows the caller to specify various configuration options and parameters
// useful for creating changefeeds on CRDB sources.
type ChangefeedStatement struct {
Cfg *ChangefeedConfig
Host string
Target ident.Table
Token string
Tables []ident.Table
SourceVersion string
QueryProjectionColumns string
}

// String returns a string representation of the changefeed statement.
func (cfs *ChangefeedStatement) String() string {
params := make(url.Values)
var feedURL url.URL
var pathIdent ident.Identifier
createStmt := "CREATE CHANGEFEED"

if cfs.Cfg.Queries {
pathIdent = cfs.Target
} else {
// Creating the comma-separated table string required by the changefeed.
TablesStr := ""
for i, table := range cfs.Tables {
if i > 0 {
TablesStr += ", "
}
TablesStr += table.String()
}
pathIdent = cfs.Target.Schema()
createStmt += fmt.Sprintf(" FOR TABLE %s", TablesStr)
}

if cfs.Cfg.Webhook {
params.Set("insecure_tls_skip_verify", "true")
feedURL = url.URL{
Scheme: "webhook-https",
Host: cfs.Host,
Path: ident.Join(pathIdent, ident.Raw, '/'),
RawQuery: params.Encode(),
}
createStmt += " INTO '" + feedURL.String() + "' " +
" WITH updated," +
" resolved='1s'," +
" webhook_auth_header='Bearer " + cfs.Token + "'"
} else {
params.Set("access_token", cfs.Token)
feedURL = url.URL{
Scheme: "experimental-http",
Host: cfs.Host,
Path: ident.Join(pathIdent, ident.Raw, '/'),
RawQuery: params.Encode(),
}
createStmt += " INTO '" + feedURL.String() + "' " +
"WITH updated, resolved='1s'"
}

if cfs.Cfg.Diff {
createStmt += ", diff"
}

if cfs.Cfg.KeyInValue {
createStmt += ", key_in_value"
}

if ok, err := supportsMinCheckpoint(cfs.SourceVersion); err == nil && ok {
createStmt += ", min_checkpoint_frequency='1s'"
}

if cfs.Cfg.Queries {
createStmt += ",envelope='wrapped',format='json'"
createStmt += " AS SELECT " + cfs.QueryProjectionColumns
createStmt += fmt.Sprintf(" FROM %s", cfs.Tables[0].String())
}

return createStmt
}

func supportsMinCheckpoint(version string) (bool, error) {
return stdpool.CockroachMinVersion(version, "v22.1")
}
113 changes: 113 additions & 0 deletions internal/sinktest/changefeed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package sinktest

import (
"testing"

"github.com/cockroachdb/replicator/internal/util/ident"
"github.com/stretchr/testify/require"
)

// Note that the `sourceVersion` must contain a space at the end
// to satisfy the semver regex.
func TestCreateChangefeedStatement(t *testing.T) {
tests := []struct {
name string
stmt ChangefeedStatement
want string
}{
{
name: "basic no changefeed configs",
stmt: ChangefeedStatement{
Cfg: &ChangefeedConfig{},
Host: "localHost:8080",
Target: ident.NewTable(ident.MustSchema(ident.New("Target"),
ident.New("public")), ident.New("tbl1")),
Token: "my_token",
Tables: []ident.Table{ident.NewTable(ident.MustSchema(ident.New("source"),
ident.New("public")), ident.New("tbl1"))},
SourceVersion: "CockroachDB CCL v24.2.1 ",
},
want: `CREATE CHANGEFEED FOR TABLE "source"."public"."tbl1" INTO ` +
`'experimental-http://localHost:8080/Target/public?access_token=my_token' ` +
`WITH updated, resolved='1s', min_checkpoint_frequency='1s'`,
},
{
name: "basic webhook",
stmt: ChangefeedStatement{
Cfg: &ChangefeedConfig{
Webhook: true,
},
Host: "localHost:8080",
Target: ident.NewTable(ident.MustSchema(ident.New("Target"), ident.New("public")),
ident.New("tbl1")),
Token: "my_token",
Tables: []ident.Table{
ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")),
ident.New("tbl1")),
ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")),
ident.New("tbl2")),
},
SourceVersion: "CockroachDB CCL v24.2.1 ",
},
want: `CREATE CHANGEFEED FOR TABLE "source"."public"."tbl1", "source"."public"."tbl2" ` +
`INTO 'webhook-https://localHost:8080/Target/public?insecure_tls_skip_verify=true' ` +
`WITH updated, resolved='1s', webhook_auth_header='Bearer my_token', ` +
`min_checkpoint_frequency='1s'`,
},
{
name: "webhook and diff and key_in_value enabled",
stmt: ChangefeedStatement{
Cfg: &ChangefeedConfig{
Webhook: true,
Diff: true,
KeyInValue: true,
},
Host: "localHost:8080",
Target: ident.NewTable(ident.MustSchema(ident.New("Target"), ident.New("public")),
ident.New("tbl1")),
Token: "my_token",
Tables: []ident.Table{
ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")),
ident.New("tbl1")),
ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")),
ident.New("tbl2")),
},
SourceVersion: "CockroachDB CCL v24.2.1 ",
},
want: `CREATE CHANGEFEED FOR TABLE "source"."public"."tbl1", "source"."public"."tbl2" ` +
`INTO 'webhook-https://localHost:8080/Target/public?insecure_tls_skip_verify=true' ` +
`WITH updated, resolved='1s', webhook_auth_header='Bearer my_token', diff, key_in_value, ` +
`min_checkpoint_frequency='1s'`,
},
{
name: "basic webhook CDC queries",
stmt: ChangefeedStatement{
Cfg: &ChangefeedConfig{
Webhook: true,
Queries: true,
},
Host: "localHost:8080",
Target: ident.NewTable(ident.MustSchema(ident.New("Target"), ident.New("public")),
ident.New("tbl1")),
Token: "my_token",
Tables: []ident.Table{
ident.NewTable(ident.MustSchema(ident.New("source"), ident.New("public")),
ident.New("tbl1")),
},
SourceVersion: "CockroachDB CCL v24.2.1 ",
QueryProjectionColumns: "pk, val",
},
want: `CREATE CHANGEFEED INTO ` +
`'webhook-https://localHost:8080/Target/public/tbl1?insecure_tls_skip_verify=true' ` +
`WITH updated, resolved='1s', webhook_auth_header='Bearer my_token', ` +
`min_checkpoint_frequency='1s',envelope='wrapped',format='json' AS SELECT pk, val ` +
`FROM "source"."public"."tbl1"`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.stmt.String()
require.Equal(t, tt.want, got)
})
}
}
Loading

0 comments on commit 4097471

Please sign in to comment.