Skip to content

Commit

Permalink
internal/target/schemawatch: added in configuration wiring for the Re…
Browse files Browse the repository at this point in the history
…freshDelay option

Previously, the RefreshDelay was set to a constant value of 1 minute, which
means that customers did not have a way to opt out or update this behavior so
it happens less or more frequently. This change introduces the wiring from each
frontend to the schemawatch package to control this schema refresh timing.

Resolves: #1047
Release Note: Added in the ability to configure the schema watcher refresh delay via a CLI flag.

(cherry picked from commit 83094af)
  • Loading branch information
ryanluu12345 authored and bobvawter committed Oct 28, 2024
1 parent 5c71db3 commit 994a9bc
Show file tree
Hide file tree
Showing 22 changed files with 370 additions and 40 deletions.
5 changes: 3 additions & 2 deletions internal/sinktest/all/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/replicator/internal/staging/version"
"github.com/cockroachdb/replicator/internal/target/apply"
"github.com/cockroachdb/replicator/internal/target/dlq"
"github.com/cockroachdb/replicator/internal/target/schemawatch"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/applycfg"
"github.com/cockroachdb/replicator/internal/util/diag"
Expand All @@ -52,12 +53,12 @@ type Fixture struct {
DLQs types.DLQs
Leases types.Leases
Memo types.Memo
SchemaWatch *schemawatch.Config
StageConfig *stage.Config
Stagers types.Stagers
VersionChecker *version.Checker
Watchers types.Watchers

Watcher types.Watcher // A watcher for TestDB.
Watcher types.Watcher // A watcher for TestDB.
}

// Applier returns a bound function that will apply mutations to the
Expand Down
11 changes: 11 additions & 0 deletions internal/sinktest/all/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,23 @@ package all

import (
"testing"
"time"

"github.com/cockroachdb/replicator/internal/sinktest/base"
"github.com/google/wire"
)

// NewFixture constructs a self-contained test fixture for all services
// in the target sub-packages.
func NewFixture(t testing.TB) (*Fixture, error) {
panic(wire.Build(TestSet, wire.Value(RefreshDelay(time.Minute))))
}

// NewFixtureFromBase constructs a new Fixture over a [base.Fixture].
func NewFixtureFromBase(fixture *base.Fixture) (*Fixture, error) {
panic(wire.Build(TestSetBase, wire.Value(RefreshDelay(time.Minute))))
}

func NewFixtureWithRefresh(t testing.TB, d RefreshDelay) (*Fixture, error) {
panic(wire.Build(TestSet))
}
38 changes: 38 additions & 0 deletions internal/sinktest/all/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,58 @@
package all

import (
"context"
"time"

"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/sinktest"
"github.com/cockroachdb/replicator/internal/sinktest/base"
"github.com/cockroachdb/replicator/internal/staging"
"github.com/cockroachdb/replicator/internal/staging/stage"
"github.com/cockroachdb/replicator/internal/target"
"github.com/cockroachdb/replicator/internal/target/dlq"
"github.com/cockroachdb/replicator/internal/target/schemawatch"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/diag"
"github.com/google/wire"
)

// RefreshDelay is a named type for the schema watch refresh delay configuration.
type RefreshDelay time.Duration

// TestSet contains providers to create a self-contained Fixture.
var TestSet = wire.NewSet(
base.TestSet,
staging.Set,
target.Set,

ProvideDLQConfig,
ProvideSchemaWatchConfig,
ProvideStageConfig,
ProvideWatcher,

wire.Struct(new(Fixture), "*"),
)

// TestSetBase creates a Fixture from a [base.Fixture].
var TestSetBase = wire.NewSet(
wire.FieldsOf(new(*base.Fixture),
"Context", "SourcePool", "SourceSchema",
"StagingPool", "StagingDB",
"TargetCache", "TargetPool", "TargetSchema"),
diag.New,
staging.Set,
target.Set,

ProvideDLQConfig,
ProvideSchemaWatchConfig,
ProvideStageConfig,
ProvideWatcher,

wire.Bind(new(context.Context), new(*stopper.Context)),
wire.Struct(new(Fixture), "*"),
)

// ProvideDLQConfig emits a default configuration.
func ProvideDLQConfig() (*dlq.Config, error) {
cfg := &dlq.Config{}
Expand All @@ -58,3 +87,12 @@ func ProvideStageConfig() (*stage.Config, error) {
func ProvideWatcher(target sinktest.TargetSchema, watchers types.Watchers) (types.Watcher, error) {
return watchers.Get(target.Schema())
}

// ProvideSchemaWatchConfig is called by Wire to construct the SchemaWatch
// configuration.
func ProvideSchemaWatchConfig(refreshDelay RefreshDelay) (*schemawatch.Config, error) {
cfg := &schemawatch.Config{
RefreshDelay: time.Duration(refreshDelay),
}
return cfg, cfg.Preflight()
}
191 changes: 190 additions & 1 deletion internal/sinktest/all/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion internal/source/cdc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/replicator/internal/script"
"github.com/cockroachdb/replicator/internal/sequencer"
"github.com/cockroachdb/replicator/internal/target/dlq"
"github.com/cockroachdb/replicator/internal/target/schemawatch"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
)
Expand All @@ -39,6 +40,7 @@ type Config struct {
ConveyorConfig conveyor.Config
DLQConfig dlq.Config
SequencerConfig sequencer.Config
SchemaWatch schemawatch.Config
ScriptConfig script.Config
// Discard all incoming HTTP payloads. This is useful for tuning
// changefeed throughput without considering Replicator performance.
Expand All @@ -61,8 +63,9 @@ type Config struct {
func (c *Config) Bind(f *pflag.FlagSet) {
c.ConveyorConfig.Bind(f)
c.DLQConfig.Bind(f)
c.SequencerConfig.Bind(f)
c.SchemaWatch.Bind(f)
c.ScriptConfig.Bind(f)
c.SequencerConfig.Bind(f)

f.BoolVar(&c.Discard, "discard", false,
"(dangerous) discard all incoming HTTP requests; useful for changefeed throughput testing")
Expand All @@ -89,6 +92,9 @@ func (c *Config) Preflight() error {
if err := c.ScriptConfig.Preflight(); err != nil {
return err
}
if err := c.SchemaWatch.Preflight(); err != nil {
return err
}

// Backfill mode may be zero to disable BestEffort.

Expand Down
7 changes: 7 additions & 0 deletions internal/source/cdc/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/replicator/internal/script"
"github.com/cockroachdb/replicator/internal/sequencer"
"github.com/cockroachdb/replicator/internal/target/dlq"
"github.com/cockroachdb/replicator/internal/target/schemawatch"
"github.com/cockroachdb/replicator/internal/types"
"github.com/google/wire"
)
Expand All @@ -30,6 +31,7 @@ var Set = wire.NewSet(
ProvideHandler,
ProvideConveyorConfig,
ProvideDLQConfig,
ProvideSchemaWatchConfig,
ProvideScriptConfig,
ProvideSequencerConfig,
conveyor.Set,
Expand All @@ -45,6 +47,11 @@ func ProvideDLQConfig(cfg *Config) *dlq.Config {
return &cfg.DLQConfig
}

// ProvideSchemaWatchConfig is called by Wire.
func ProvideSchemaWatchConfig(cfg *Config) *schemawatch.Config {
return &cfg.SchemaWatch
}

// ProvideScriptConfig is called by Wire.
func ProvideScriptConfig(cfg *Config) *script.Config {
return &cfg.ScriptConfig
Expand Down
Loading

0 comments on commit 994a9bc

Please sign in to comment.