diff --git a/internal/sinktest/all/fixture.go b/internal/sinktest/all/fixture.go index 9a1bd21d..56be1f0d 100644 --- a/internal/sinktest/all/fixture.go +++ b/internal/sinktest/all/fixture.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/replicator/internal/staging/version" "github.com/cockroachdb/replicator/internal/target/apply" "github.com/cockroachdb/replicator/internal/target/dlq" + "github.com/cockroachdb/replicator/internal/target/schemawatch" "github.com/cockroachdb/replicator/internal/types" "github.com/cockroachdb/replicator/internal/util/applycfg" "github.com/cockroachdb/replicator/internal/util/diag" @@ -52,12 +53,12 @@ type Fixture struct { DLQs types.DLQs Leases types.Leases Memo types.Memo + SchemaWatch *schemawatch.Config StageConfig *stage.Config Stagers types.Stagers VersionChecker *version.Checker Watchers types.Watchers - - Watcher types.Watcher // A watcher for TestDB. + Watcher types.Watcher // A watcher for TestDB. } // Applier returns a bound function that will apply mutations to the diff --git a/internal/sinktest/all/injector.go b/internal/sinktest/all/injector.go index f3d2b167..1b0425a4 100644 --- a/internal/sinktest/all/injector.go +++ b/internal/sinktest/all/injector.go @@ -21,12 +21,23 @@ package all import ( "testing" + "time" + "github.com/cockroachdb/replicator/internal/sinktest/base" "github.com/google/wire" ) // NewFixture constructs a self-contained test fixture for all services // in the target sub-packages. func NewFixture(t testing.TB) (*Fixture, error) { + panic(wire.Build(TestSet, wire.Value(RefreshDelay(time.Minute)))) +} + +// NewFixtureFromBase constructs a new Fixture over a [base.Fixture]. +func NewFixtureFromBase(fixture *base.Fixture) (*Fixture, error) { + panic(wire.Build(TestSetBase, wire.Value(RefreshDelay(time.Minute)))) +} + +func NewFixtureWithRefresh(t testing.TB, d RefreshDelay) (*Fixture, error) { panic(wire.Build(TestSet)) } diff --git a/internal/sinktest/all/provider.go b/internal/sinktest/all/provider.go index 6b496e6e..4ab7d9a2 100644 --- a/internal/sinktest/all/provider.go +++ b/internal/sinktest/all/provider.go @@ -18,16 +18,25 @@ package all import ( + "context" + "time" + + "github.com/cockroachdb/field-eng-powertools/stopper" "github.com/cockroachdb/replicator/internal/sinktest" "github.com/cockroachdb/replicator/internal/sinktest/base" "github.com/cockroachdb/replicator/internal/staging" "github.com/cockroachdb/replicator/internal/staging/stage" "github.com/cockroachdb/replicator/internal/target" "github.com/cockroachdb/replicator/internal/target/dlq" + "github.com/cockroachdb/replicator/internal/target/schemawatch" "github.com/cockroachdb/replicator/internal/types" + "github.com/cockroachdb/replicator/internal/util/diag" "github.com/google/wire" ) +// RefreshDelay is a named type for the schema watch refresh delay configuration. +type RefreshDelay time.Duration + // TestSet contains providers to create a self-contained Fixture. var TestSet = wire.NewSet( base.TestSet, @@ -35,12 +44,32 @@ var TestSet = wire.NewSet( target.Set, ProvideDLQConfig, + ProvideSchemaWatchConfig, ProvideStageConfig, ProvideWatcher, wire.Struct(new(Fixture), "*"), ) +// TestSetBase creates a Fixture from a [base.Fixture]. +var TestSetBase = wire.NewSet( + wire.FieldsOf(new(*base.Fixture), + "Context", "SourcePool", "SourceSchema", + "StagingPool", "StagingDB", + "TargetCache", "TargetPool", "TargetSchema"), + diag.New, + staging.Set, + target.Set, + + ProvideDLQConfig, + ProvideSchemaWatchConfig, + ProvideStageConfig, + ProvideWatcher, + + wire.Bind(new(context.Context), new(*stopper.Context)), + wire.Struct(new(Fixture), "*"), +) + // ProvideDLQConfig emits a default configuration. func ProvideDLQConfig() (*dlq.Config, error) { cfg := &dlq.Config{} @@ -58,3 +87,12 @@ func ProvideStageConfig() (*stage.Config, error) { func ProvideWatcher(target sinktest.TargetSchema, watchers types.Watchers) (types.Watcher, error) { return watchers.Get(target.Schema()) } + +// ProvideSchemaWatchConfig is called by Wire to construct the SchemaWatch +// configuration. +func ProvideSchemaWatchConfig(refreshDelay RefreshDelay) (*schemawatch.Config, error) { + cfg := &schemawatch.Config{ + RefreshDelay: time.Duration(refreshDelay), + } + return cfg, cfg.Preflight() +} diff --git a/internal/sinktest/all/wire_gen.go b/internal/sinktest/all/wire_gen.go index 8ed574b2..80736190 100644 --- a/internal/sinktest/all/wire_gen.go +++ b/internal/sinktest/all/wire_gen.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/replicator/internal/util/applycfg" "github.com/cockroachdb/replicator/internal/util/diag" "testing" + "time" ) // Injectors from injector.go: @@ -71,7 +72,194 @@ func NewFixture(t testing.TB) (*Fixture, error) { if err != nil { return nil, err } - watchers, err := schemawatch.ProvideFactory(context, targetPool, diagnostics) + refreshDelay := _wireRefreshDelayValue + schemawatchConfig, err := ProvideSchemaWatchConfig(refreshDelay) + if err != nil { + return nil, err + } + watchers, err := schemawatch.ProvideFactory(context, schemawatchConfig, targetPool, diagnostics) + if err != nil { + return nil, err + } + dlQs := dlq.ProvideDLQs(config, targetPool, watchers) + acceptor, err := apply.ProvideAcceptor(context, targetStatements, configs, diagnostics, dlQs, targetPool, watchers) + if err != nil { + return nil, err + } + checkpoints, err := checkpoint.ProvideCheckpoints(context, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + typesLeases, err := leases.ProvideLeases(context, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + memoMemo, err := memo.ProvideMemo(context, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + stageConfig, err := ProvideStageConfig() + if err != nil { + return nil, err + } + stagers := stage.ProvideFactory(stageConfig, stagingPool, stagingSchema, context) + checker := version.ProvideChecker(stagingPool, memoMemo) + watcher, err := ProvideWatcher(targetSchema, watchers) + if err != nil { + return nil, err + } + allFixture := &Fixture{ + Fixture: fixture, + ApplyAcceptor: acceptor, + Checkpoints: checkpoints, + Configs: configs, + Diagnostics: diagnostics, + DLQConfig: config, + DLQs: dlQs, + Leases: typesLeases, + Memo: memoMemo, + SchemaWatch: schemawatchConfig, + StageConfig: stageConfig, + Stagers: stagers, + VersionChecker: checker, + Watchers: watchers, + Watcher: watcher, + } + return allFixture, nil +} + +var ( + _wireRefreshDelayValue = RefreshDelay(time.Minute) +) + +// NewFixtureFromBase constructs a new Fixture over a [base.Fixture]. +func NewFixtureFromBase(fixture *base.Fixture) (*Fixture, error) { + context := fixture.Context + targetStatements := fixture.TargetCache + diagnostics := diag.New(context) + configs, err := applycfg.ProvideConfigs(diagnostics) + if err != nil { + return nil, err + } + config, err := ProvideDLQConfig() + if err != nil { + return nil, err + } + targetPool := fixture.TargetPool + refreshDelay := _wireAllRefreshDelayValue + schemawatchConfig, err := ProvideSchemaWatchConfig(refreshDelay) + if err != nil { + return nil, err + } + watchers, err := schemawatch.ProvideFactory(context, schemawatchConfig, targetPool, diagnostics) + if err != nil { + return nil, err + } + dlQs := dlq.ProvideDLQs(config, targetPool, watchers) + acceptor, err := apply.ProvideAcceptor(context, targetStatements, configs, diagnostics, dlQs, targetPool, watchers) + if err != nil { + return nil, err + } + stagingPool := fixture.StagingPool + stagingSchema := fixture.StagingDB + checkpoints, err := checkpoint.ProvideCheckpoints(context, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + typesLeases, err := leases.ProvideLeases(context, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + memoMemo, err := memo.ProvideMemo(context, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + stageConfig, err := ProvideStageConfig() + if err != nil { + return nil, err + } + stagers := stage.ProvideFactory(stageConfig, stagingPool, stagingSchema, context) + checker := version.ProvideChecker(stagingPool, memoMemo) + targetSchema := fixture.TargetSchema + watcher, err := ProvideWatcher(targetSchema, watchers) + if err != nil { + return nil, err + } + allFixture := &Fixture{ + Fixture: fixture, + ApplyAcceptor: acceptor, + Checkpoints: checkpoints, + Configs: configs, + Diagnostics: diagnostics, + DLQConfig: config, + DLQs: dlQs, + Leases: typesLeases, + Memo: memoMemo, + SchemaWatch: schemawatchConfig, + StageConfig: stageConfig, + Stagers: stagers, + VersionChecker: checker, + Watchers: watchers, + Watcher: watcher, + } + return allFixture, nil +} + +var ( + _wireAllRefreshDelayValue = RefreshDelay(time.Minute) +) + +func NewFixtureWithRefresh(t testing.TB, d RefreshDelay) (*Fixture, error) { + context := base.ProvideContext(t) + diagnostics := diag.New(context) + sourcePool, err := base.ProvideSourcePool(context, diagnostics) + if err != nil { + return nil, err + } + sourceSchema, err := base.ProvideSourceSchema(context, sourcePool) + if err != nil { + return nil, err + } + stagingPool, err := base.ProvideStagingPool(context) + if err != nil { + return nil, err + } + stagingSchema, err := base.ProvideStagingSchema(context, stagingPool) + if err != nil { + return nil, err + } + targetPool, err := base.ProvideTargetPool(context, sourcePool, diagnostics) + if err != nil { + return nil, err + } + targetStatements := base.ProvideTargetStatements(context, targetPool) + targetSchema, err := base.ProvideTargetSchema(context, diagnostics, targetPool, targetStatements) + if err != nil { + return nil, err + } + fixture := &base.Fixture{ + Context: context, + SourcePool: sourcePool, + SourceSchema: sourceSchema, + StagingPool: stagingPool, + StagingDB: stagingSchema, + TargetCache: targetStatements, + TargetPool: targetPool, + TargetSchema: targetSchema, + } + configs, err := applycfg.ProvideConfigs(diagnostics) + if err != nil { + return nil, err + } + config, err := ProvideDLQConfig() + if err != nil { + return nil, err + } + schemawatchConfig, err := ProvideSchemaWatchConfig(d) + if err != nil { + return nil, err + } + watchers, err := schemawatch.ProvideFactory(context, schemawatchConfig, targetPool, diagnostics) if err != nil { return nil, err } @@ -112,6 +300,7 @@ func NewFixture(t testing.TB) (*Fixture, error) { DLQs: dlQs, Leases: typesLeases, Memo: memoMemo, + SchemaWatch: schemawatchConfig, StageConfig: stageConfig, Stagers: stagers, VersionChecker: checker, diff --git a/internal/source/cdc/config.go b/internal/source/cdc/config.go index 1b984855..99d140a4 100644 --- a/internal/source/cdc/config.go +++ b/internal/source/cdc/config.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/replicator/internal/script" "github.com/cockroachdb/replicator/internal/sequencer" "github.com/cockroachdb/replicator/internal/target/dlq" + "github.com/cockroachdb/replicator/internal/target/schemawatch" log "github.com/sirupsen/logrus" "github.com/spf13/pflag" ) @@ -39,6 +40,7 @@ type Config struct { ConveyorConfig conveyor.Config DLQConfig dlq.Config SequencerConfig sequencer.Config + SchemaWatch schemawatch.Config ScriptConfig script.Config // Discard all incoming HTTP payloads. This is useful for tuning // changefeed throughput without considering Replicator performance. @@ -61,8 +63,9 @@ type Config struct { func (c *Config) Bind(f *pflag.FlagSet) { c.ConveyorConfig.Bind(f) c.DLQConfig.Bind(f) - c.SequencerConfig.Bind(f) + c.SchemaWatch.Bind(f) c.ScriptConfig.Bind(f) + c.SequencerConfig.Bind(f) f.BoolVar(&c.Discard, "discard", false, "(dangerous) discard all incoming HTTP requests; useful for changefeed throughput testing") @@ -89,6 +92,9 @@ func (c *Config) Preflight() error { if err := c.ScriptConfig.Preflight(); err != nil { return err } + if err := c.SchemaWatch.Preflight(); err != nil { + return err + } // Backfill mode may be zero to disable BestEffort. diff --git a/internal/source/cdc/provider.go b/internal/source/cdc/provider.go index 7b1094de..95709bdd 100644 --- a/internal/source/cdc/provider.go +++ b/internal/source/cdc/provider.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/replicator/internal/script" "github.com/cockroachdb/replicator/internal/sequencer" "github.com/cockroachdb/replicator/internal/target/dlq" + "github.com/cockroachdb/replicator/internal/target/schemawatch" "github.com/cockroachdb/replicator/internal/types" "github.com/google/wire" ) @@ -30,6 +31,7 @@ var Set = wire.NewSet( ProvideHandler, ProvideConveyorConfig, ProvideDLQConfig, + ProvideSchemaWatchConfig, ProvideScriptConfig, ProvideSequencerConfig, conveyor.Set, @@ -45,6 +47,11 @@ func ProvideDLQConfig(cfg *Config) *dlq.Config { return &cfg.DLQConfig } +// ProvideSchemaWatchConfig is called by Wire. +func ProvideSchemaWatchConfig(cfg *Config) *schemawatch.Config { + return &cfg.SchemaWatch +} + // ProvideScriptConfig is called by Wire. func ProvideScriptConfig(cfg *Config) *script.Config { return &cfg.ScriptConfig diff --git a/internal/source/cdc/server/wire_gen.go b/internal/source/cdc/server/wire_gen.go index 317050eb..d190c15d 100644 --- a/internal/source/cdc/server/wire_gen.go +++ b/internal/source/cdc/server/wire_gen.go @@ -81,7 +81,8 @@ func NewServer(ctx *stopper.Context, config *Config) (*Server, error) { return nil, err } dlqConfig := cdc.ProvideDLQConfig(cdcConfig) - watchers, err := schemawatch.ProvideFactory(ctx, targetPool, diagnostics) + schemawatchConfig := cdc.ProvideSchemaWatchConfig(cdcConfig) + watchers, err := schemawatch.ProvideFactory(ctx, schemawatchConfig, targetPool, diagnostics) if err != nil { return nil, err } @@ -174,7 +175,8 @@ func newTestFixture(context *stopper.Context, config *Config) (*testFixture, fun return nil, nil, err } dlqConfig := cdc.ProvideDLQConfig(cdcConfig) - watchers, err := schemawatch.ProvideFactory(context, targetPool, diagnostics) + schemawatchConfig := cdc.ProvideSchemaWatchConfig(cdcConfig) + watchers, err := schemawatch.ProvideFactory(context, schemawatchConfig, targetPool, diagnostics) if err != nil { return nil, nil, err } diff --git a/internal/source/cdc/wire_gen.go b/internal/source/cdc/wire_gen.go index d052cc53..43b114b4 100644 --- a/internal/source/cdc/wire_gen.go +++ b/internal/source/cdc/wire_gen.go @@ -44,8 +44,9 @@ func newTestFixture(fixture *all.Fixture, config *Config) (*testFixture, error) } stagers := fixture.Stagers targetPool := baseFixture.TargetPool + schemawatchConfig := ProvideSchemaWatchConfig(config) diagnostics := diag.New(context) - watchers, err := schemawatch.ProvideFactory(context, targetPool, diagnostics) + watchers, err := schemawatch.ProvideFactory(context, schemawatchConfig, targetPool, diagnostics) if err != nil { return nil, err } diff --git a/internal/source/kafka/config.go b/internal/source/kafka/config.go index 61f47587..9001783b 100644 --- a/internal/source/kafka/config.go +++ b/internal/source/kafka/config.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/replicator/internal/sinkprod" "github.com/cockroachdb/replicator/internal/staging/stage" "github.com/cockroachdb/replicator/internal/target/dlq" + "github.com/cockroachdb/replicator/internal/target/schemawatch" "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/cockroachdb/replicator/internal/util/ident" "github.com/cockroachdb/replicator/internal/util/secure" @@ -48,6 +49,7 @@ type EagerConfig Config type Config struct { ConveyorConfig conveyor.Config DLQ dlq.Config + SchemaWatch schemawatch.Config Script script.Config Sequencer sequencer.Config Stage stage.Config // Staging table configuration. @@ -89,6 +91,7 @@ type SASLConfig struct { // Bind adds flags to the set. It delegates to the embedded Config.Bind. func (c *Config) Bind(f *pflag.FlagSet) { c.DLQ.Bind(f) + c.SchemaWatch.Bind(f) c.Script.Bind(f) c.Sequencer.Bind(f) c.Stage.Bind(f) @@ -137,6 +140,9 @@ func (c *Config) Preflight(ctx context.Context) error { if err := c.DLQ.Preflight(); err != nil { return err } + if err := c.SchemaWatch.Preflight(); err != nil { + return err + } if err := c.Script.Preflight(); err != nil { return err } diff --git a/internal/source/kafka/provider.go b/internal/source/kafka/provider.go index 4fb24d26..8b577e1e 100644 --- a/internal/source/kafka/provider.go +++ b/internal/source/kafka/provider.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/field-eng-powertools/stopper" "github.com/cockroachdb/replicator/internal/conveyor" "github.com/cockroachdb/replicator/internal/script" + "github.com/cockroachdb/replicator/internal/target/schemawatch" "github.com/google/wire" ) @@ -28,6 +29,7 @@ var Set = wire.NewSet( ProvideConn, ProvideConveyorConfig, ProvideEagerConfig, + ProvideSchemaWatchConfig, ) // ProvideEagerConfig is a hack to move up the evaluation of the user @@ -42,6 +44,11 @@ func ProvideConveyorConfig(cfg *Config) *conveyor.Config { return &cfg.ConveyorConfig } +// ProvideSchemaWatchConfig is called by Wire. +func ProvideSchemaWatchConfig(cfg *Config) *schemawatch.Config { + return &cfg.SchemaWatch +} + // ProvideConn is called by Wire to construct this package's // logical.Dialect implementation. There's a fake dependency on // the script loader so that flags can be evaluated first. diff --git a/internal/source/kafka/wire_gen.go b/internal/source/kafka/wire_gen.go index b5857ca1..fc958312 100644 --- a/internal/source/kafka/wire_gen.go +++ b/internal/source/kafka/wire_gen.go @@ -54,7 +54,8 @@ func Start(ctx *stopper.Context, config *Config) (*Kafka, error) { return nil, err } dlqConfig := &eagerConfig.DLQ - watchers, err := schemawatch.ProvideFactory(ctx, targetPool, diagnostics) + schemawatchConfig := ProvideSchemaWatchConfig(config) + watchers, err := schemawatch.ProvideFactory(ctx, schemawatchConfig, targetPool, diagnostics) if err != nil { return nil, err } diff --git a/internal/source/mylogical/config.go b/internal/source/mylogical/config.go index c723b589..ce944b31 100644 --- a/internal/source/mylogical/config.go +++ b/internal/source/mylogical/config.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/replicator/internal/sinkprod" "github.com/cockroachdb/replicator/internal/staging/stage" "github.com/cockroachdb/replicator/internal/target/dlq" + "github.com/cockroachdb/replicator/internal/target/schemawatch" "github.com/cockroachdb/replicator/internal/util/ident" "github.com/pkg/errors" "github.com/spf13/pflag" @@ -41,12 +42,13 @@ type EagerConfig Config // Config contains the configuration necessary for creating a // replication connection. ServerID and SourceConn are mandatory. type Config struct { - DLQ dlq.Config - Script script.Config - Sequencer sequencer.Config - Stage stage.Config // Staging table configuration. - Staging sinkprod.StagingConfig // Staging database configuration. - Target sinkprod.TargetConfig + DLQ dlq.Config + SchemaWatch schemawatch.Config + Script script.Config + Sequencer sequencer.Config + Stage stage.Config // Staging table configuration. + Staging sinkprod.StagingConfig // Staging database configuration. + Target sinkprod.TargetConfig InitialGTID string FetchMetadata bool @@ -68,6 +70,7 @@ type Config struct { // Bind adds flags to the set. It delegates to the embedded Config.Bind. func (c *Config) Bind(f *pflag.FlagSet) { c.DLQ.Bind(f) + c.SchemaWatch.Bind(f) c.Script.Bind(f) c.Sequencer.Bind(f) c.Stage.Bind(f) @@ -136,6 +139,9 @@ func (c *Config) Preflight() error { if err := c.DLQ.Preflight(); err != nil { return err } + if err := c.SchemaWatch.Preflight(); err != nil { + return err + } if err := c.Script.Preflight(); err != nil { return err } diff --git a/internal/source/mylogical/provider.go b/internal/source/mylogical/provider.go index 394d84ef..8c25a7bc 100644 --- a/internal/source/mylogical/provider.go +++ b/internal/source/mylogical/provider.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/replicator/internal/sequencer/immediate" scriptSeq "github.com/cockroachdb/replicator/internal/sequencer/script" "github.com/cockroachdb/replicator/internal/target/apply" + "github.com/cockroachdb/replicator/internal/target/schemawatch" "github.com/cockroachdb/replicator/internal/types" "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/cockroachdb/replicator/internal/util/ident" @@ -36,6 +37,7 @@ import ( var Set = wire.NewSet( ProvideConn, ProvideEagerConfig, + ProvideSchemaWatchConfig, ) // ProvideConn is called by Wire to construct this package's @@ -115,3 +117,8 @@ func ProvideConn( func ProvideEagerConfig(cfg *Config, _ *script.Loader) (*EagerConfig, error) { return (*EagerConfig)(cfg), cfg.Preflight() } + +// ProvideSchemaWatchConfig is called by Wire. +func ProvideSchemaWatchConfig(cfg *Config) *schemawatch.Config { + return &cfg.SchemaWatch +} diff --git a/internal/source/mylogical/wire_gen.go b/internal/source/mylogical/wire_gen.go index cc05a409..59be59bc 100644 --- a/internal/source/mylogical/wire_gen.go +++ b/internal/source/mylogical/wire_gen.go @@ -50,7 +50,8 @@ func Start(ctx *stopper.Context, config *Config) (*MYLogical, error) { return nil, err } dlqConfig := &eagerConfig.DLQ - watchers, err := schemawatch.ProvideFactory(ctx, targetPool, diagnostics) + schemawatchConfig := ProvideSchemaWatchConfig(config) + watchers, err := schemawatch.ProvideFactory(ctx, schemawatchConfig, targetPool, diagnostics) if err != nil { return nil, err } diff --git a/internal/source/pglogical/config.go b/internal/source/pglogical/config.go index 2c54b514..c4e8a20f 100644 --- a/internal/source/pglogical/config.go +++ b/internal/source/pglogical/config.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/replicator/internal/sinkprod" "github.com/cockroachdb/replicator/internal/staging/stage" "github.com/cockroachdb/replicator/internal/target/dlq" + "github.com/cockroachdb/replicator/internal/target/schemawatch" "github.com/cockroachdb/replicator/internal/util/ident" "github.com/pkg/errors" "github.com/spf13/pflag" @@ -42,12 +43,13 @@ type EagerConfig Config // replication connection. All field, other than TestControls, are // mandatory unless explicitly indicated. type Config struct { - DLQ dlq.Config - Script script.Config - Sequencer sequencer.Config - Stage stage.Config // Staging table configuration. - Staging sinkprod.StagingConfig // Staging database configuration. - Target sinkprod.TargetConfig + DLQ dlq.Config + SchemaWatch schemawatch.Config + Script script.Config + Sequencer sequencer.Config + Stage stage.Config // Staging table configuration. + Staging sinkprod.StagingConfig // Staging database configuration. + Target sinkprod.TargetConfig // The name of the publication to attach to. Publication string @@ -67,6 +69,7 @@ type Config struct { // Bind adds flags to the set. func (c *Config) Bind(f *pflag.FlagSet) { c.DLQ.Bind(f) + c.SchemaWatch.Bind(f) c.Script.Bind(f) c.Sequencer.Bind(f) c.Stage.Bind(f) @@ -92,6 +95,9 @@ func (c *Config) Preflight() error { if err := c.DLQ.Preflight(); err != nil { return err } + if err := c.SchemaWatch.Preflight(); err != nil { + return err + } if err := c.Script.Preflight(); err != nil { return err } diff --git a/internal/source/pglogical/provider.go b/internal/source/pglogical/provider.go index 494307fc..4442e924 100644 --- a/internal/source/pglogical/provider.go +++ b/internal/source/pglogical/provider.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/replicator/internal/sequencer/immediate" "github.com/cockroachdb/replicator/internal/sequencer/script" "github.com/cockroachdb/replicator/internal/target/apply" + "github.com/cockroachdb/replicator/internal/target/schemawatch" "github.com/cockroachdb/replicator/internal/types" "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/cockroachdb/replicator/internal/util/ident" @@ -38,6 +39,7 @@ import ( var Set = wire.NewSet( ProvideConn, ProvideEagerConfig, + ProvideSchemaWatchConfig, ) // ProvideConn is called by Wire to construct a connection to the source @@ -146,3 +148,8 @@ func ProvideConn( func ProvideEagerConfig(cfg *Config, _ *scriptRT.Loader) (*EagerConfig, error) { return (*EagerConfig)(cfg), cfg.Preflight() } + +// ProvideSchemaWatchConfig is called by Wire. +func ProvideSchemaWatchConfig(cfg *Config) *schemawatch.Config { + return &cfg.SchemaWatch +} diff --git a/internal/source/pglogical/wire_gen.go b/internal/source/pglogical/wire_gen.go index 50edb9a0..eab26cd2 100644 --- a/internal/source/pglogical/wire_gen.go +++ b/internal/source/pglogical/wire_gen.go @@ -50,7 +50,8 @@ func Start(context *stopper.Context, config *Config) (*PGLogical, error) { return nil, err } dlqConfig := &eagerConfig.DLQ - watchers, err := schemawatch.ProvideFactory(context, targetPool, diagnostics) + schemawatchConfig := ProvideSchemaWatchConfig(config) + watchers, err := schemawatch.ProvideFactory(context, schemawatchConfig, targetPool, diagnostics) if err != nil { return nil, err } diff --git a/internal/target/schemawatch/config.go b/internal/target/schemawatch/config.go new file mode 100644 index 00000000..11fbdb1e --- /dev/null +++ b/internal/target/schemawatch/config.go @@ -0,0 +1,42 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package schemawatch + +import ( + "time" + + "github.com/spf13/pflag" +) + +const defaultRefreshDelay = time.Minute + +// Config controls the schemawatch behavior. +type Config struct { + RefreshDelay time.Duration +} + +// Bind adds configuration flags to the set. +func (c *Config) Bind(f *pflag.FlagSet) { + f.DurationVar(&c.RefreshDelay, "schemaRefresh", defaultRefreshDelay, + `controls how often a watcher will refresh its schema. `+ + `If this value is zero or negative, refresh behavior will be disabled.`) +} + +// Preflight validates the configuration. +func (c *Config) Preflight() error { + return nil +} diff --git a/internal/target/schemawatch/factory.go b/internal/target/schemawatch/factory.go index bdff3119..2f69a8f0 100644 --- a/internal/target/schemawatch/factory.go +++ b/internal/target/schemawatch/factory.go @@ -28,6 +28,7 @@ import ( // factory is a memoizing factory for watcher instances. type factory struct { + cfg *Config pool *types.TargetPool stop *stopper.Context mu struct { @@ -72,7 +73,7 @@ func (f *factory) createUnlocked(db ident.Schema) (*watcher, error) { return ret, nil } - ret, err := newWatcher(f.stop, f.pool, db) + ret, err := newWatcher(f.stop, f.cfg, f.pool, db) if err != nil { return nil, err } diff --git a/internal/target/schemawatch/provider.go b/internal/target/schemawatch/provider.go index 9824b97b..38e4ce98 100644 --- a/internal/target/schemawatch/provider.go +++ b/internal/target/schemawatch/provider.go @@ -31,9 +31,9 @@ var Set = wire.NewSet( // ProvideFactory is called by Wire to construct the Watchers factory. func ProvideFactory( - ctx *stopper.Context, pool *types.TargetPool, d *diag.Diagnostics, + ctx *stopper.Context, cfg *Config, pool *types.TargetPool, d *diag.Diagnostics, ) (types.Watchers, error) { - w := &factory{pool: pool, stop: ctx} + w := &factory{cfg: cfg, pool: pool, stop: ctx} w.mu.data = &ident.SchemaMap[*watcher]{} if err := d.Register("schema", w); err != nil { return nil, err diff --git a/internal/target/schemawatch/watcher.go b/internal/target/schemawatch/watcher.go index dcd0526e..39a590e9 100644 --- a/internal/target/schemawatch/watcher.go +++ b/internal/target/schemawatch/watcher.go @@ -21,7 +21,6 @@ package schemawatch import ( "context" "database/sql" - "flag" "fmt" "sync" "time" @@ -34,11 +33,6 @@ import ( log "github.com/sirupsen/logrus" ) -// RefreshDelay controls how ofter a watcher will refresh its schema. If -// this value is zero or negative, refresh behavior will be disabled. -var RefreshDelay = flag.Duration("schemaRefresh", time.Minute, - "how often to scan for schema changes; set to zero to disable") - // A watcher maintains an internal cache of a database's schema, // allowing callers to receive notifications of schema changes. type watcher struct { @@ -59,10 +53,12 @@ var _ types.Watcher = (*watcher)(nil) // newWatcher constructs a new watcher to monitor the table schema in the // named database. The returned watcher will internally refresh // until the cancel callback is executed. -func newWatcher(ctx *stopper.Context, tx *types.TargetPool, schema ident.Schema) (*watcher, error) { +func newWatcher( + ctx *stopper.Context, cfg *Config, tx *types.TargetPool, schema ident.Schema, +) (*watcher, error) { w := &watcher{ background: ctx, - delay: *RefreshDelay, + delay: cfg.RefreshDelay, schema: schema, } w.mu.updated = make(chan struct{}) diff --git a/internal/target/schemawatch/watcher_test.go b/internal/target/schemawatch/watcher_test.go index 7f98c0fd..ffc88087 100644 --- a/internal/target/schemawatch/watcher_test.go +++ b/internal/target/schemawatch/watcher_test.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/replicator/internal/sinktest" "github.com/cockroachdb/replicator/internal/sinktest/all" - "github.com/cockroachdb/replicator/internal/target/schemawatch" "github.com/cockroachdb/replicator/internal/types" "github.com/cockroachdb/replicator/internal/util/ident" "github.com/cockroachdb/replicator/internal/util/retry" @@ -33,12 +32,7 @@ import ( func TestWatch(t *testing.T) { r := require.New(t) - // Override the delay to exercise the background goroutine. - const delay = time.Second - *schemawatch.RefreshDelay = delay - defer func() { *schemawatch.RefreshDelay = time.Minute }() - - fixture, err := all.NewFixture(t) + fixture, err := all.NewFixtureWithRefresh(t, all.RefreshDelay(time.Second)) r.NoError(err) ctx := fixture.Context