From d2e6452bbc51b290dea68dcfaa8f1400146d3e96 Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Wed, 4 Dec 2024 12:57:13 -0600 Subject: [PATCH] source-sqlserver: filegroup and role_name settings This commit adds new advanced options for the CDC instance filegroup and role_name arguments. If set, these values will be plugged into the appropriate query parameters whenever the connector tries to autocreate CDC instances. If unset, the `role_name` argument will continue to be set to the capture username as it always has been, while the filegroup will be omitted as it always has been. --- .../.snapshots/TestFilegroupAndRole | 14 ++++++++ .../.snapshots/TestGeneric-SpecResponse | 10 ++++++ source-sqlserver/main.go | 2 ++ source-sqlserver/main_test.go | 28 ++++++++++++++++ source-sqlserver/prerequisites.go | 2 +- source-sqlserver/replication.go | 33 ++++++++++++++----- 6 files changed, 79 insertions(+), 10 deletions(-) create mode 100644 source-sqlserver/.snapshots/TestFilegroupAndRole diff --git a/source-sqlserver/.snapshots/TestFilegroupAndRole b/source-sqlserver/.snapshots/TestFilegroupAndRole new file mode 100644 index 0000000000..59c2e2a153 --- /dev/null +++ b/source-sqlserver/.snapshots/TestFilegroupAndRole @@ -0,0 +1,14 @@ +# ================================ +# Collection "acmeCo/test/test_filegroupandrole_93932362": 6 Documents +# ================================ +{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_FilegroupAndRole_93932362","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"zero","id":0} +{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_FilegroupAndRole_93932362","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"one","id":1} +{"_meta":{"op":"c","source":{"schema":"dbo","snapshot":true,"table":"test_FilegroupAndRole_93932362","lsn":"","seqval":"AAAAAAAAAAAAAA=="}},"data":"two","id":2} +{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_FilegroupAndRole_93932362","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"three","id":3} +{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_FilegroupAndRole_93932362","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"four","id":4} +{"_meta":{"op":"c","source":{"schema":"dbo","table":"test_FilegroupAndRole_93932362","lsn":"AAAAAAAAAAAAAA==","seqval":"AAAAAAAAAAAAAA==","updateMask":"Aw=="}},"data":"five","id":5} +# ================================ +# Final State Checkpoint +# ================================ +{"bindingStateV1":{"dbo%2Ftest_FilegroupAndRole_93932362":{"backfilled":3,"key_columns":["id"],"mode":"Active"}},"cursor":"AAAAAAAAAAAAAA=="} + diff --git a/source-sqlserver/.snapshots/TestGeneric-SpecResponse b/source-sqlserver/.snapshots/TestGeneric-SpecResponse index c35161687f..10b3072847 100644 --- a/source-sqlserver/.snapshots/TestGeneric-SpecResponse +++ b/source-sqlserver/.snapshots/TestGeneric-SpecResponse @@ -69,6 +69,16 @@ "title": "Automatic Capture Instance Management", "description": "When set the connector will respond to alterations of captured tables by automatically creating updated capture instances and deleting the old ones. Requires DBO permissions to use.", "default": false + }, + "filegroup": { + "type": "string", + "title": "CDC Instance Filegroup", + "description": "When set the connector will create new CDC instances with the specified 'filegroup_name' argument. Has no effect if CDC instances are managed manually." + }, + "role_name": { + "type": "string", + "title": "CDC Instance Access Role", + "description": "When set the connector will create new CDC instances with the specified 'role_name' argument as the gating role. When unset the capture user name is used as the 'role_name' instead. Has no effect if CDC instances are managed manually." } }, "additionalProperties": false, diff --git a/source-sqlserver/main.go b/source-sqlserver/main.go index a8ddffe3f7..5ab4b02ee5 100644 --- a/source-sqlserver/main.go +++ b/source-sqlserver/main.go @@ -55,6 +55,8 @@ type advancedConfig struct { BackfillChunkSize int `json:"backfill_chunk_size,omitempty" jsonschema:"title=Backfill Chunk Size,default=50000,description=The number of rows which should be fetched from the database in a single backfill query."` AutomaticChangeTableCleanup bool `json:"change_table_cleanup,omitempty" jsonschema:"title=Automatic Change Table Cleanup,default=false,description=When set the connector will delete CDC change table entries as soon as they are persisted into Flow. Requires DBO permissions to use."` AutomaticCaptureInstances bool `json:"capture_instance_management,omitempty" jsonschema:"title=Automatic Capture Instance Management,default=false,description=When set the connector will respond to alterations of captured tables by automatically creating updated capture instances and deleting the old ones. Requires DBO permissions to use."` + Filegroup string `json:"filegroup,omitempty" jsonschema:"title=CDC Instance Filegroup,description=When set the connector will create new CDC instances with the specified 'filegroup_name' argument. Has no effect if CDC instances are managed manually."` + RoleName string `json:"role_name,omitempty" jsonschema:"title=CDC Instance Access Role,description=When set the connector will create new CDC instances with the specified 'role_name' argument as the gating role. When unset the capture user name is used as the 'role_name' instead. Has no effect if CDC instances are managed manually."` } type tunnelConfig struct { diff --git a/source-sqlserver/main_test.go b/source-sqlserver/main_test.go index 1be2887ee7..88239a7152 100644 --- a/source-sqlserver/main_test.go +++ b/source-sqlserver/main_test.go @@ -538,3 +538,31 @@ func TestDroppedAndRecreatedTable(t *testing.T) { cupaloy.SnapshotT(t, cs.Summary()) } + +func TestFilegroupAndRole(t *testing.T) { + // Turn off the test logic that creates CDC instances when creating tables, for this one test. + var oldEnableCDCWhenCreatingTables = *enableCDCWhenCreatingTables + *enableCDCWhenCreatingTables = false + t.Cleanup(func() { *enableCDCWhenCreatingTables = oldEnableCDCWhenCreatingTables }) + + var tb, ctx = sqlserverTestBackend(t), context.Background() + var uniqueID = "93932362" + var tableName = tb.CreateTable(ctx, t, uniqueID, "(id INTEGER PRIMARY KEY, data TEXT)") + + // Grant the 'db_owner' role, it is required to create a CDC instance automatically. + tb.Query(ctx, t, fmt.Sprintf("ALTER ROLE db_owner ADD MEMBER %s", *dbCaptureUser)) + t.Cleanup(func() { tb.Query(ctx, t, fmt.Sprintf("ALTER ROLE db_owner DROP MEMBER %s", *dbCaptureUser)) }) + + var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(uniqueID)) + cs.Validator = &st.OrderedCaptureValidator{} + cs.EndpointSpec.(*Config).Advanced.Filegroup = "PRIMARY" + cs.EndpointSpec.(*Config).Advanced.RoleName = "flow_capture" + sqlcapture.TestShutdownAfterCaughtUp = true + t.Cleanup(func() { sqlcapture.TestShutdownAfterCaughtUp = false }) + + tb.Insert(ctx, t, tableName, [][]any{{0, "zero"}, {1, "one"}, {2, "two"}}) + cs.Capture(ctx, t, nil) + tb.Insert(ctx, t, tableName, [][]any{{3, "three"}, {4, "four"}, {5, "five"}}) + cs.Capture(ctx, t, nil) + cupaloy.SnapshotT(t, cs.Summary()) +} diff --git a/source-sqlserver/prerequisites.go b/source-sqlserver/prerequisites.go index f46a2f109e..2bee8241b8 100644 --- a/source-sqlserver/prerequisites.go +++ b/source-sqlserver/prerequisites.go @@ -253,7 +253,7 @@ func (db *sqlserverDatabase) prerequisiteTableCaptureInstance(ctx context.Contex } // Otherwise we attempt to create one - if instanceName, err := cdcCreateCaptureInstance(ctx, db.conn, schema, table, db.config.User); err == nil { + if instanceName, err := cdcCreateCaptureInstance(ctx, db.conn, schema, table, db.config.User, db.config.Advanced.Filegroup, db.config.Advanced.RoleName); err == nil { logEntry.WithField("instance", instanceName).Info("enabled cdc for table") return nil } diff --git a/source-sqlserver/replication.go b/source-sqlserver/replication.go index ff1eb376fb..4d8ab62ba9 100644 --- a/source-sqlserver/replication.go +++ b/source-sqlserver/replication.go @@ -514,7 +514,7 @@ func (rs *sqlserverReplicationStream) manageCaptureInstances(ctx context.Context // Perform all instance creations for _, create := range createInstances { - var instanceName, err = cdcCreateCaptureInstance(ctx, rs.conn, create.TableSchema, create.TableName, rs.cfg.User) + var instanceName, err = cdcCreateCaptureInstance(ctx, rs.conn, create.TableSchema, create.TableName, rs.cfg.User, rs.cfg.Advanced.Filegroup, rs.cfg.Advanced.RoleName) if err != nil { return err } @@ -910,7 +910,7 @@ func cdcCleanupChangeTable(ctx context.Context, conn *sql.DB, instanceName strin return nil } -func cdcCreateCaptureInstance(ctx context.Context, conn *sql.DB, schema, table, username string) (string, error) { +func cdcCreateCaptureInstance(ctx context.Context, conn *sql.DB, schema, table, username, filegroup, roleName string) (string, error) { // SQL Server table names may be up to 128 characters, but capture instance names must // be at most 100 characters and we have other information to cram in there. The names // must be unique, but other than that they might as well be random strings, the logic @@ -931,11 +931,6 @@ func cdcCreateCaptureInstance(ctx context.Context, conn *sql.DB, schema, table, // The full instance name is the 64 bytes prefix, 8 bytes of hash in hex, 10 bytes of Unix timestamp // as a decimal number, and two underscores for a total of 84 bytes, well under the limit of 100. var instanceName = fmt.Sprintf("%s_%08X_%d", prefix, hash, time.Now().Unix()) - log.WithFields(log.Fields{ - "schema": schema, - "table": table, - "instance": instanceName, - }).Debug("creating new capture instance") // According to https://learn.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sys-sp-cdc-enable-table-transact-sql // the `sys.sp_cdc_enable_table` procedure "Requires membership in the db_owner fixed database role." @@ -944,8 +939,28 @@ func cdcCreateCaptureInstance(ctx context.Context, conn *sql.DB, schema, table, // this role we should expect this to fail. We always try to create capture instances during validation // because it hurts nothing if we're rejected, and if the relevant table-alteration handling option is // enabled we verify as a prerequisite that we have 'db_owner' so this shouldn't fail there. - const query = `EXEC sys.sp_cdc_enable_table @source_schema = @p1, @source_name = @p2, @role_name = @p3, @capture_instance = @p4;` - if _, err := conn.ExecContext(ctx, query, schema, table, username, instanceName); err != nil { + var args = []any{schema, table, instanceName} + var query = "EXEC sys.sp_cdc_enable_table @source_schema = @p1, @source_name = @p2, @capture_instance = @p3" + if roleName != "" { + args = append(args, roleName) + } else { + args = append(args, username) + } + query += fmt.Sprintf(", @role_name = @p%d", len(args)) + if filegroup != "" { + args = append(args, filegroup) + query += fmt.Sprintf(", @filegroup_name = @p%d", len(args)) + } + query += ";" + log.WithFields(log.Fields{ + "schema": schema, + "table": table, + "instance": instanceName, + "query": query, + "query_args": args, + }).Debug("creating new capture instance") + if _, err := conn.ExecContext(ctx, query, args...); err != nil { + log.WithField("err", err).Debug("failed to create capture instance") return "", fmt.Errorf("error creating capture instance %q: %w", instanceName, err) } return instanceName, nil