Skip to content

Commit

Permalink
source-sqlserver: filegroup and role_name settings
Browse files Browse the repository at this point in the history
This commit adds new advanced options for the CDC instance filegroup
and role_name arguments. If set, these values will be plugged into
the appropriate query parameters whenever the connector tries to
autocreate CDC instances.

If unset, the `role_name` argument will continue to be set to the
capture username as it always has been, while the filegroup will
be omitted as it always has been.
  • Loading branch information
willdonnelly committed Dec 6, 2024
1 parent 82315d3 commit d2e6452
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 10 deletions.
14 changes: 14 additions & 0 deletions source-sqlserver/.snapshots/TestFilegroupAndRole
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# ================================
# Collection "acmeCo/test/test_filegroupandrole_93932362": 6 Documents
# ================================
{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_FilegroupAndRole_93932362","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"zero","id":0}
{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_FilegroupAndRole_93932362","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"one","id":1}
{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_FilegroupAndRole_93932362","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"two","id":2}
{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_FilegroupAndRole_93932362","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"three","id":3}
{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_FilegroupAndRole_93932362","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"four","id":4}
{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_FilegroupAndRole_93932362","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"five","id":5}
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"dbo%2Ftest_FilegroupAndRole_93932362":{"backfilled":3,"key_columns":["id"],"mode":"Active"}},"cursor":"AAAAAAAAAAAAAA=="}

10 changes: 10 additions & 0 deletions source-sqlserver/.snapshots/TestGeneric-SpecResponse
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@
"title": "Automatic Capture Instance Management",
"description": "When set the connector will respond to alterations of captured tables by automatically creating updated capture instances and deleting the old ones. Requires DBO permissions to use.",
"default": false
},
"filegroup": {
"type": "string",
"title": "CDC Instance Filegroup",
"description": "When set the connector will create new CDC instances with the specified 'filegroup_name' argument. Has no effect if CDC instances are managed manually."
},
"role_name": {
"type": "string",
"title": "CDC Instance Access Role",
"description": "When set the connector will create new CDC instances with the specified 'role_name' argument as the gating role. When unset the capture user name is used as the 'role_name' instead. Has no effect if CDC instances are managed manually."
}
},
"additionalProperties": false,
Expand Down
2 changes: 2 additions & 0 deletions source-sqlserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type advancedConfig struct {
BackfillChunkSize int `json:"backfill_chunk_size,omitempty" jsonschema:"title=Backfill Chunk Size,default=50000,description=The number of rows which should be fetched from the database in a single backfill query."`
AutomaticChangeTableCleanup bool `json:"change_table_cleanup,omitempty" jsonschema:"title=Automatic Change Table Cleanup,default=false,description=When set the connector will delete CDC change table entries as soon as they are persisted into Flow. Requires DBO permissions to use."`
AutomaticCaptureInstances bool `json:"capture_instance_management,omitempty" jsonschema:"title=Automatic Capture Instance Management,default=false,description=When set the connector will respond to alterations of captured tables by automatically creating updated capture instances and deleting the old ones. Requires DBO permissions to use."`
Filegroup string `json:"filegroup,omitempty" jsonschema:"title=CDC Instance Filegroup,description=When set the connector will create new CDC instances with the specified 'filegroup_name' argument. Has no effect if CDC instances are managed manually."`
RoleName string `json:"role_name,omitempty" jsonschema:"title=CDC Instance Access Role,description=When set the connector will create new CDC instances with the specified 'role_name' argument as the gating role. When unset the capture user name is used as the 'role_name' instead. Has no effect if CDC instances are managed manually."`
}

type tunnelConfig struct {
Expand Down
28 changes: 28 additions & 0 deletions source-sqlserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,3 +538,31 @@ func TestDroppedAndRecreatedTable(t *testing.T) {

cupaloy.SnapshotT(t, cs.Summary())
}

func TestFilegroupAndRole(t *testing.T) {
// Turn off the test logic that creates CDC instances when creating tables, for this one test.
var oldEnableCDCWhenCreatingTables = *enableCDCWhenCreatingTables
*enableCDCWhenCreatingTables = false
t.Cleanup(func() { *enableCDCWhenCreatingTables = oldEnableCDCWhenCreatingTables })

var tb, ctx = sqlserverTestBackend(t), context.Background()
var uniqueID = "93932362"
var tableName = tb.CreateTable(ctx, t, uniqueID, "(id INTEGER PRIMARY KEY, data TEXT)")

// Grant the 'db_owner' role, it is required to create a CDC instance automatically.
tb.Query(ctx, t, fmt.Sprintf("ALTER ROLE db_owner ADD MEMBER %s", *dbCaptureUser))
t.Cleanup(func() { tb.Query(ctx, t, fmt.Sprintf("ALTER ROLE db_owner DROP MEMBER %s", *dbCaptureUser)) })

var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(uniqueID))
cs.Validator = &st.OrderedCaptureValidator{}
cs.EndpointSpec.(*Config).Advanced.Filegroup = "PRIMARY"
cs.EndpointSpec.(*Config).Advanced.RoleName = "flow_capture"
sqlcapture.TestShutdownAfterCaughtUp = true
t.Cleanup(func() { sqlcapture.TestShutdownAfterCaughtUp = false })

tb.Insert(ctx, t, tableName, [][]any{{0, "zero"}, {1, "one"}, {2, "two"}})
cs.Capture(ctx, t, nil)
tb.Insert(ctx, t, tableName, [][]any{{3, "three"}, {4, "four"}, {5, "five"}})
cs.Capture(ctx, t, nil)
cupaloy.SnapshotT(t, cs.Summary())
}
2 changes: 1 addition & 1 deletion source-sqlserver/prerequisites.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (db *sqlserverDatabase) prerequisiteTableCaptureInstance(ctx context.Contex
}

// Otherwise we attempt to create one
if instanceName, err := cdcCreateCaptureInstance(ctx, db.conn, schema, table, db.config.User); err == nil {
if instanceName, err := cdcCreateCaptureInstance(ctx, db.conn, schema, table, db.config.User, db.config.Advanced.Filegroup, db.config.Advanced.RoleName); err == nil {
logEntry.WithField("instance", instanceName).Info("enabled cdc for table")
return nil
}
Expand Down
33 changes: 24 additions & 9 deletions source-sqlserver/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func (rs *sqlserverReplicationStream) manageCaptureInstances(ctx context.Context

// Perform all instance creations
for _, create := range createInstances {
var instanceName, err = cdcCreateCaptureInstance(ctx, rs.conn, create.TableSchema, create.TableName, rs.cfg.User)
var instanceName, err = cdcCreateCaptureInstance(ctx, rs.conn, create.TableSchema, create.TableName, rs.cfg.User, rs.cfg.Advanced.Filegroup, rs.cfg.Advanced.RoleName)
if err != nil {
return err
}
Expand Down Expand Up @@ -910,7 +910,7 @@ func cdcCleanupChangeTable(ctx context.Context, conn *sql.DB, instanceName strin
return nil
}

func cdcCreateCaptureInstance(ctx context.Context, conn *sql.DB, schema, table, username string) (string, error) {
func cdcCreateCaptureInstance(ctx context.Context, conn *sql.DB, schema, table, username, filegroup, roleName string) (string, error) {
// SQL Server table names may be up to 128 characters, but capture instance names must
// be at most 100 characters and we have other information to cram in there. The names
// must be unique, but other than that they might as well be random strings, the logic
Expand All @@ -931,11 +931,6 @@ func cdcCreateCaptureInstance(ctx context.Context, conn *sql.DB, schema, table,
// The full instance name is the 64 bytes prefix, 8 bytes of hash in hex, 10 bytes of Unix timestamp
// as a decimal number, and two underscores for a total of 84 bytes, well under the limit of 100.
var instanceName = fmt.Sprintf("%s_%08X_%d", prefix, hash, time.Now().Unix())
log.WithFields(log.Fields{
"schema": schema,
"table": table,
"instance": instanceName,
}).Debug("creating new capture instance")

// According to https://learn.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sys-sp-cdc-enable-table-transact-sql
// the `sys.sp_cdc_enable_table` procedure "Requires membership in the db_owner fixed database role."
Expand All @@ -944,8 +939,28 @@ func cdcCreateCaptureInstance(ctx context.Context, conn *sql.DB, schema, table,
// this role we should expect this to fail. We always try to create capture instances during validation
// because it hurts nothing if we're rejected, and if the relevant table-alteration handling option is
// enabled we verify as a prerequisite that we have 'db_owner' so this shouldn't fail there.
const query = `EXEC sys.sp_cdc_enable_table @source_schema = @p1, @source_name = @p2, @role_name = @p3, @capture_instance = @p4;`
if _, err := conn.ExecContext(ctx, query, schema, table, username, instanceName); err != nil {
var args = []any{schema, table, instanceName}
var query = "EXEC sys.sp_cdc_enable_table @source_schema = @p1, @source_name = @p2, @capture_instance = @p3"
if roleName != "" {
args = append(args, roleName)
} else {
args = append(args, username)
}
query += fmt.Sprintf(", @role_name = @p%d", len(args))
if filegroup != "" {
args = append(args, filegroup)
query += fmt.Sprintf(", @filegroup_name = @p%d", len(args))
}
query += ";"
log.WithFields(log.Fields{
"schema": schema,
"table": table,
"instance": instanceName,
"query": query,
"query_args": args,
}).Debug("creating new capture instance")
if _, err := conn.ExecContext(ctx, query, args...); err != nil {
log.WithField("err", err).Debug("failed to create capture instance")
return "", fmt.Errorf("error creating capture instance %q: %w", instanceName, err)
}
return instanceName, nil
Expand Down

0 comments on commit d2e6452

Please sign in to comment.