Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-redshift-batch: Add the 'use_schema_inference' feature flag #2187

Merged
merged 4 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions source-boilerplate/feature_flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package boilerplate

import "strings"

// ParseFeatureFlags parses a comma-separated list of flag names and combines that with a
// map describing default flag settings in the absence of any flags. A flag name can be
// prefixed with 'no_' to explicitly set it to a false value, in case the default is (or
// might soon become) true.
func ParseFeatureFlags(flags string, defaults map[string]bool) map[string]bool {
var settings = make(map[string]bool)
for k, v := range defaults {
settings[k] = v
}
for _, flagName := range strings.Split(flags, ",") {
var flagValue = true
if strings.HasPrefix(flagName, "no_") {
flagName = strings.TrimPrefix(flagName, "no_")
flagValue = false
}
if flagName != "" {
settings[flagName] = flagValue
}
}
return settings
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"test_basic_datatypes_13111208":{"LastPolled":"<TIMESTAMP>"}}}
{"bindingStateV1":{"test_basic_datatypes_13111208":{"CursorNames":null,"CursorValues":null,"LastPolled":"<TIMESTAMP>"}}}

2 changes: 2 additions & 0 deletions source-redshift-batch/.snapshots/TestBasicDatatypes-Discovery
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ Binding 0:
]
},
"a_real": {
"format": "number",
"type": [
"number",
"string",
"null"
]
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
Binding 0:
{
"resource_config_json": {
"name": "test_feature_flag_use_schema_inference_77244729",
"template": "{{if .CursorFields -}}\n {{- if .IsFirstQuery -}}\n SELECT * FROM \"test\".\"feature_flag_use_schema_inference_77244729\"\n {{- else -}}\n SELECT * FROM \"test\".\"feature_flag_use_schema_inference_77244729\"\n\t{{- range $i, $k := $.CursorFields -}}\n\t {{- if eq $i 0}} WHERE ({{else}}) OR ({{end -}}\n {{- range $j, $n := $.CursorFields -}}\n\t\t{{- if lt $j $i -}}\n\t\t {{$n}} = ${{add $j 1}} AND {{end -}}\n\t {{- end -}}\n\t {{$k}} \u003e ${{add $i 1}}\n\t{{- end -}}\n\t) \n {{- end}} ORDER BY {{range $i, $k := $.CursorFields}}{{if gt $i 0}}, {{end}}{{$k}}{{end -}};\n{{- else -}}\n SELECT * FROM \"test\".\"feature_flag_use_schema_inference_77244729\";\n{{- end}}\n"
},
"resource_path": [
"test_feature_flag_use_schema_inference_77244729"
],
"collection": {
"name": "acmeCo/test/test_feature_flag_use_schema_inference_77244729",
"read_schema_json": {
"type": "object",
"required": [
"_meta",
"id"
],
"properties": {
"_meta": {
"$schema": "http://json-schema.org/draft/2020-12/schema",
"$id": "https://github.com/estuary/connectors/source-redshift-batch/document-metadata",
"properties": {
"polled": {
"type": "string",
"format": "date-time",
"title": "Polled Timestamp",
"description": "The time at which the update query which produced this document as executed."
},
"index": {
"type": "integer",
"title": "Result Index",
"description": "The index of this document within the query execution which produced it."
}
},
"type": "object",
"required": [
"polled",
"index"
]
},
"data": {
"type": [
"string",
"null"
]
},
"id": {
"type": "integer"
}
},
"x-infer-schema": true
},
"key": [
"/id"
],
"projections": null
},
"state_key": "test_feature_flag_use_schema_inference_77244729"
}

8 changes: 4 additions & 4 deletions source-redshift-batch/.snapshots/TestFloatNaNs-Capture
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# ================================
# Collection "acmeCo/test/test_float_nans_10511": 3 Documents
# ================================
{"_meta":{"polled":"<TIMESTAMP>","index":999},"a_double":"NaN","a_real":2,"id":0,"txid":999999}
{"_meta":{"polled":"<TIMESTAMP>","index":999},"a_double":3,"a_real":"NaN","id":1,"txid":999999}
{"_meta":{"polled":"<TIMESTAMP>","index":999},"a_double":"-Infinity","a_real":"Infinity","id":2,"txid":999999}
{"_meta":{"polled":"<TIMESTAMP>","index":999},"a_double":"NaN","a_real":2,"id":0}
{"_meta":{"polled":"<TIMESTAMP>","index":999},"a_double":3,"a_real":"NaN","id":1}
{"_meta":{"polled":"<TIMESTAMP>","index":999},"a_double":"-Infinity","a_real":"Infinity","id":2}
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"test_float_nans_10511":{"CursorNames":["txid"],"CursorValues":[999999],"LastPolled":"<TIMESTAMP>"}}}
{"bindingStateV1":{"test_float_nans_10511":{"CursorNames":null,"CursorValues":null,"LastPolled":"<TIMESTAMP>"}}}

69 changes: 69 additions & 0 deletions source-redshift-batch/.snapshots/TestFloatNaNs-Discovery
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
Binding 0:
{
"resource_config_json": {
"name": "test_float_nans_10511",
"template": "{{if .CursorFields -}}\n {{- if .IsFirstQuery -}}\n SELECT * FROM \"test\".\"float_nans_10511\"\n {{- else -}}\n SELECT * FROM \"test\".\"float_nans_10511\"\n\t{{- range $i, $k := $.CursorFields -}}\n\t {{- if eq $i 0}} WHERE ({{else}}) OR ({{end -}}\n {{- range $j, $n := $.CursorFields -}}\n\t\t{{- if lt $j $i -}}\n\t\t {{$n}} = ${{add $j 1}} AND {{end -}}\n\t {{- end -}}\n\t {{$k}} \u003e ${{add $i 1}}\n\t{{- end -}}\n\t) \n {{- end}} ORDER BY {{range $i, $k := $.CursorFields}}{{if gt $i 0}}, {{end}}{{$k}}{{end -}};\n{{- else -}}\n SELECT * FROM \"test\".\"float_nans_10511\";\n{{- end}}\n"
},
"resource_path": [
"test_float_nans_10511"
],
"collection": {
"name": "acmeCo/test/test_float_nans_10511",
"read_schema_json": {
"type": "object",
"required": [
"_meta",
"id"
],
"properties": {
"_meta": {
"$schema": "http://json-schema.org/draft/2020-12/schema",
"$id": "https://github.com/estuary/connectors/source-redshift-batch/document-metadata",
"properties": {
"polled": {
"type": "string",
"format": "date-time",
"title": "Polled Timestamp",
"description": "The time at which the update query which produced this document as executed."
},
"index": {
"type": "integer",
"title": "Result Index",
"description": "The index of this document within the query execution which produced it."
}
},
"type": "object",
"required": [
"polled",
"index"
]
},
"a_double": {
"format": "number",
"type": [
"number",
"string",
"null"
]
},
"a_real": {
"format": "number",
"type": [
"number",
"string",
"null"
]
},
"id": {
"type": "integer"
}
}
},
"key": [
"/id"
],
"projections": null
},
"state_key": "test_float_nans_10511"
}

5 changes: 5 additions & 0 deletions source-redshift-batch/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
],
"title": "SSL Mode",
"description": "Overrides SSL connection behavior by setting the 'sslmode' parameter."
},
"feature_flags": {
"type": "string",
"title": "Feature Flags",
"description": "This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."
}
},
"additionalProperties": false,
Expand Down
28 changes: 23 additions & 5 deletions source-redshift-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ const (
documentsPerCheckpoint = 1000
)

var featureFlagDefaults = map[string]bool{
// When set, discovered collection schemas will request that schema inference be
// used _in addition to_ the full column/types discovery we already do.
"use_schema_inference": false,
}

// BatchSQLDriver represents a generic "batch SQL" capture behavior, parameterized
// by a config schema, connect function, and value translation logic.
type BatchSQLDriver struct {
Expand Down Expand Up @@ -86,7 +92,7 @@ type documentMetadata struct {
Index int `json:"index" jsonschema:"title=Result Index,description=The index of this document within the query execution which produced it."`
}

func generateCollectionSchema(keyColumns []string, columnTypes map[string]columnType) (json.RawMessage, error) {
func generateCollectionSchema(keyColumns []string, columnTypes map[string]columnType, useSchemaInference bool) (json.RawMessage, error) {
// Generate schema for the metadata via reflection
var reflector = jsonschema.Reflector{
ExpandedStruct: true,
Expand All @@ -104,13 +110,17 @@ func generateCollectionSchema(keyColumns []string, columnTypes map[string]column
properties[colName] = colType.JSONSchema()
}

var extras = map[string]any{
"properties": properties,
}
if useSchemaInference {
extras["x-infer-schema"] = true
}
var schema = &jsonschema.Schema{
Type: "object",
Required: required,
AdditionalProperties: nil,
Extras: map[string]interface{}{
"properties": properties,
},
Extras: extras,
}

// Marshal schema to JSON
Expand Down Expand Up @@ -150,6 +160,9 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove
}
cfg.SetDefaults()

var featureFlags = boilerplate.ParseFeatureFlags(cfg.Advanced.FeatureFlags, featureFlagDefaults)
log.WithField("flags", featureFlags).Info("parsed feature flags")

var db, err = drv.Connect(ctx, &cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -245,7 +258,7 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove
columnTypes[column.Name] = column.DataType
}

generatedSchema, err := generateCollectionSchema(keyColumns, columnTypes)
generatedSchema, err := generateCollectionSchema(keyColumns, columnTypes, featureFlags["use_schema_inference"])
if err != nil {
log.WithFields(log.Fields{"table": tableID, "err": err}).Warn("unable to generate collection schema")
continue
Expand Down Expand Up @@ -604,6 +617,9 @@ func (drv *BatchSQLDriver) Pull(open *pc.Request_Open, stream *boilerplate.PullO
}
cfg.SetDefaults()

var featureFlags = boilerplate.ParseFeatureFlags(cfg.Advanced.FeatureFlags, featureFlagDefaults)
log.WithField("flags", featureFlags).Info("parsed feature flags")

var db, err = drv.Connect(stream.Context(), &cfg)
if err != nil {
return err
Expand Down Expand Up @@ -646,6 +662,7 @@ func (drv *BatchSQLDriver) Pull(open *pc.Request_Open, stream *boilerplate.PullO
Bindings: bindings,
Output: stream,
TranslateValue: drv.TranslateValue,
featureFlags: featureFlags,
}
return capture.Run(stream.Context())
}
Expand Down Expand Up @@ -681,6 +698,7 @@ type capture struct {
Bindings []bindingInfo
Output *boilerplate.PullOutput
TranslateValue func(val any, databaseTypeName string) (any, error)
featureFlags map[string]bool
}

type bindingInfo struct {
Expand Down
1 change: 1 addition & 0 deletions source-redshift-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type advancedConfig struct {
PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset." jsonschema_extras:"pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"`
DiscoverSchemas []string `json:"discover_schemas,omitempty" jsonschema:"title=Discovery Schema Selection,description=If this is specified only tables in the selected schema(s) will be automatically discovered. Omit all entries to discover tables from all schemas."`
SSLMode string `json:"sslmode,omitempty" jsonschema:"title=SSL Mode,description=Overrides SSL connection behavior by setting the 'sslmode' parameter.,enum=disable,enum=allow,enum=prefer,enum=require,enum=verify-ca,enum=verify-full"`
FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."`
}

// Validate checks that the configuration possesses all required properties.
Expand Down
18 changes: 18 additions & 0 deletions source-redshift-batch/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,21 @@ func snapshotBindings(t testing.TB, bindings []*pf.CaptureSpec_Binding) {
}
cupaloy.SnapshotT(t, summary.String())
}

func TestFeatureFlagUseSchemaInference(t *testing.T) {
var ctx, cs = context.Background(), testCaptureSpec(t)
var control = testControlClient(ctx, t)
var uniqueID = "77244729"
var tableName = fmt.Sprintf("test.feature_flag_use_schema_inference_%s", uniqueID)

// Includes some extra junk to make sure the parsing helper logic is doing its job as intended
cs.EndpointSpec.(*Config).Advanced.FeatureFlags = "this_flag_does_not_exist,use_schema_inference,,,,no_this_flag_also_does_not_exist"

executeControlQuery(ctx, t, control, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName))
t.Cleanup(func() { executeControlQuery(ctx, t, control, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) })
executeControlQuery(ctx, t, control, fmt.Sprintf("CREATE TABLE %s(id INTEGER PRIMARY KEY, data TEXT)", tableName))

// Discover the table and verify discovery snapshot
cs.Bindings = discoverStreams(ctx, t, cs, regexp.MustCompile(uniqueID))
t.Run("Discovery", func(t *testing.T) { snapshotBindings(t, cs.Bindings) })
}
Loading