diff --git a/source-boilerplate/feature_flags.go b/source-boilerplate/feature_flags.go new file mode 100644 index 0000000000..b1fa9feb3d --- /dev/null +++ b/source-boilerplate/feature_flags.go @@ -0,0 +1,25 @@ +package boilerplate + +import "strings" + +// ParseFeatureFlags parses a comma-separated list of flag names and combines that with a +// map describing default flag settings in the absence of any flags. A flag name can be +// prefixed with 'no_' to explicitly set it to a false value, in case the default is (or +// might soon become) true. +func ParseFeatureFlags(flags string, defaults map[string]bool) map[string]bool { + var settings = make(map[string]bool) + for k, v := range defaults { + settings[k] = v + } + for _, flagName := range strings.Split(flags, ",") { + var flagValue = true + if strings.HasPrefix(flagName, "no_") { + flagName = strings.TrimPrefix(flagName, "no_") + flagValue = false + } + if flagName != "" { + settings[flagName] = flagValue + } + } + return settings +} diff --git a/source-redshift-batch/.snapshots/TestBasicDatatypes-Capture b/source-redshift-batch/.snapshots/TestBasicDatatypes-Capture index abf089cf66..1208082d8f 100644 --- a/source-redshift-batch/.snapshots/TestBasicDatatypes-Capture +++ b/source-redshift-batch/.snapshots/TestBasicDatatypes-Capture @@ -5,5 +5,5 @@ # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"test_basic_datatypes_13111208":{"LastPolled":""}}} +{"bindingStateV1":{"test_basic_datatypes_13111208":{"CursorNames":null,"CursorValues":null,"LastPolled":""}}} diff --git a/source-redshift-batch/.snapshots/TestBasicDatatypes-Discovery b/source-redshift-batch/.snapshots/TestBasicDatatypes-Discovery index 0b4b3bec26..0b6726f24a 100644 --- a/source-redshift-batch/.snapshots/TestBasicDatatypes-Discovery +++ b/source-redshift-batch/.snapshots/TestBasicDatatypes-Discovery @@ -52,8 +52,10 @@ Binding 0: ] }, "a_real": { + "format": "number", "type": [ "number", + "string", "null" ] }, diff --git a/source-redshift-batch/.snapshots/TestFeatureFlagUseSchemaInference-Discovery b/source-redshift-batch/.snapshots/TestFeatureFlagUseSchemaInference-Discovery new file mode 100644 index 0000000000..6a50a235e0 --- /dev/null +++ b/source-redshift-batch/.snapshots/TestFeatureFlagUseSchemaInference-Discovery @@ -0,0 +1,60 @@ +Binding 0: +{ + "resource_config_json": { + "name": "test_feature_flag_use_schema_inference_77244729", + "template": "{{if .CursorFields -}}\n {{- if .IsFirstQuery -}}\n SELECT * FROM \"test\".\"feature_flag_use_schema_inference_77244729\"\n {{- else -}}\n SELECT * FROM \"test\".\"feature_flag_use_schema_inference_77244729\"\n\t{{- range $i, $k := $.CursorFields -}}\n\t {{- if eq $i 0}} WHERE ({{else}}) OR ({{end -}}\n {{- range $j, $n := $.CursorFields -}}\n\t\t{{- if lt $j $i -}}\n\t\t {{$n}} = ${{add $j 1}} AND {{end -}}\n\t {{- end -}}\n\t {{$k}} \u003e ${{add $i 1}}\n\t{{- end -}}\n\t) \n {{- end}} ORDER BY {{range $i, $k := $.CursorFields}}{{if gt $i 0}}, {{end}}{{$k}}{{end -}};\n{{- else -}}\n SELECT * FROM \"test\".\"feature_flag_use_schema_inference_77244729\";\n{{- end}}\n" + }, + "resource_path": [ + "test_feature_flag_use_schema_inference_77244729" + ], + "collection": { + "name": "acmeCo/test/test_feature_flag_use_schema_inference_77244729", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-redshift-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "data": { + "type": [ + "string", + "null" + ] + }, + "id": { + "type": "integer" + } + }, + "x-infer-schema": true + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "test_feature_flag_use_schema_inference_77244729" + } + diff --git a/source-redshift-batch/.snapshots/TestFloatNaNs-Capture b/source-redshift-batch/.snapshots/TestFloatNaNs-Capture index 778a498147..153ccff89d 100644 --- a/source-redshift-batch/.snapshots/TestFloatNaNs-Capture +++ b/source-redshift-batch/.snapshots/TestFloatNaNs-Capture @@ -1,11 +1,11 @@ # ================================ # Collection "acmeCo/test/test_float_nans_10511": 3 Documents # ================================ -{"_meta":{"polled":"","index":999},"a_double":"NaN","a_real":2,"id":0,"txid":999999} -{"_meta":{"polled":"","index":999},"a_double":3,"a_real":"NaN","id":1,"txid":999999} -{"_meta":{"polled":"","index":999},"a_double":"-Infinity","a_real":"Infinity","id":2,"txid":999999} +{"_meta":{"polled":"","index":999},"a_double":"NaN","a_real":2,"id":0} +{"_meta":{"polled":"","index":999},"a_double":3,"a_real":"NaN","id":1} +{"_meta":{"polled":"","index":999},"a_double":"-Infinity","a_real":"Infinity","id":2} # ================================ # Final State Checkpoint # ================================ -{"bindingStateV1":{"test_float_nans_10511":{"CursorNames":["txid"],"CursorValues":[999999],"LastPolled":""}}} +{"bindingStateV1":{"test_float_nans_10511":{"CursorNames":null,"CursorValues":null,"LastPolled":""}}} diff --git a/source-redshift-batch/.snapshots/TestFloatNaNs-Discovery b/source-redshift-batch/.snapshots/TestFloatNaNs-Discovery new file mode 100644 index 0000000000..c9ae112feb --- /dev/null +++ b/source-redshift-batch/.snapshots/TestFloatNaNs-Discovery @@ -0,0 +1,69 @@ +Binding 0: +{ + "resource_config_json": { + "name": "test_float_nans_10511", + "template": "{{if .CursorFields -}}\n {{- if .IsFirstQuery -}}\n SELECT * FROM \"test\".\"float_nans_10511\"\n {{- else -}}\n SELECT * FROM \"test\".\"float_nans_10511\"\n\t{{- range $i, $k := $.CursorFields -}}\n\t {{- if eq $i 0}} WHERE ({{else}}) OR ({{end -}}\n {{- range $j, $n := $.CursorFields -}}\n\t\t{{- if lt $j $i -}}\n\t\t {{$n}} = ${{add $j 1}} AND {{end -}}\n\t {{- end -}}\n\t {{$k}} \u003e ${{add $i 1}}\n\t{{- end -}}\n\t) \n {{- end}} ORDER BY {{range $i, $k := $.CursorFields}}{{if gt $i 0}}, {{end}}{{$k}}{{end -}};\n{{- else -}}\n SELECT * FROM \"test\".\"float_nans_10511\";\n{{- end}}\n" + }, + "resource_path": [ + "test_float_nans_10511" + ], + "collection": { + "name": "acmeCo/test/test_float_nans_10511", + "read_schema_json": { + "type": "object", + "required": [ + "_meta", + "id" + ], + "properties": { + "_meta": { + "$schema": "http://json-schema.org/draft/2020-12/schema", + "$id": "https://github.com/estuary/connectors/source-redshift-batch/document-metadata", + "properties": { + "polled": { + "type": "string", + "format": "date-time", + "title": "Polled Timestamp", + "description": "The time at which the update query which produced this document as executed." + }, + "index": { + "type": "integer", + "title": "Result Index", + "description": "The index of this document within the query execution which produced it." + } + }, + "type": "object", + "required": [ + "polled", + "index" + ] + }, + "a_double": { + "format": "number", + "type": [ + "number", + "string", + "null" + ] + }, + "a_real": { + "format": "number", + "type": [ + "number", + "string", + "null" + ] + }, + "id": { + "type": "integer" + } + } + }, + "key": [ + "/id" + ], + "projections": null + }, + "state_key": "test_float_nans_10511" + } + diff --git a/source-redshift-batch/.snapshots/TestSpec b/source-redshift-batch/.snapshots/TestSpec index e6d4343aed..2b042c2539 100644 --- a/source-redshift-batch/.snapshots/TestSpec +++ b/source-redshift-batch/.snapshots/TestSpec @@ -55,6 +55,11 @@ ], "title": "SSL Mode", "description": "Overrides SSL connection behavior by setting the 'sslmode' parameter." + }, + "feature_flags": { + "type": "string", + "title": "Feature Flags", + "description": "This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support." } }, "additionalProperties": false, diff --git a/source-redshift-batch/driver.go b/source-redshift-batch/driver.go index 2d9c2729e7..7fc49aabe9 100644 --- a/source-redshift-batch/driver.go +++ b/source-redshift-batch/driver.go @@ -32,6 +32,12 @@ const ( documentsPerCheckpoint = 1000 ) +var featureFlagDefaults = map[string]bool{ + // When set, discovered collection schemas will request that schema inference be + // used _in addition to_ the full column/types discovery we already do. + "use_schema_inference": false, +} + // BatchSQLDriver represents a generic "batch SQL" capture behavior, parameterized // by a config schema, connect function, and value translation logic. type BatchSQLDriver struct { @@ -86,7 +92,7 @@ type documentMetadata struct { Index int `json:"index" jsonschema:"title=Result Index,description=The index of this document within the query execution which produced it."` } -func generateCollectionSchema(keyColumns []string, columnTypes map[string]columnType) (json.RawMessage, error) { +func generateCollectionSchema(keyColumns []string, columnTypes map[string]columnType, useSchemaInference bool) (json.RawMessage, error) { // Generate schema for the metadata via reflection var reflector = jsonschema.Reflector{ ExpandedStruct: true, @@ -104,13 +110,17 @@ func generateCollectionSchema(keyColumns []string, columnTypes map[string]column properties[colName] = colType.JSONSchema() } + var extras = map[string]any{ + "properties": properties, + } + if useSchemaInference { + extras["x-infer-schema"] = true + } var schema = &jsonschema.Schema{ Type: "object", Required: required, AdditionalProperties: nil, - Extras: map[string]interface{}{ - "properties": properties, - }, + Extras: extras, } // Marshal schema to JSON @@ -150,6 +160,9 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove } cfg.SetDefaults() + var featureFlags = boilerplate.ParseFeatureFlags(cfg.Advanced.FeatureFlags, featureFlagDefaults) + log.WithField("flags", featureFlags).Info("parsed feature flags") + var db, err = drv.Connect(ctx, &cfg) if err != nil { return nil, err @@ -245,7 +258,7 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove columnTypes[column.Name] = column.DataType } - generatedSchema, err := generateCollectionSchema(keyColumns, columnTypes) + generatedSchema, err := generateCollectionSchema(keyColumns, columnTypes, featureFlags["use_schema_inference"]) if err != nil { log.WithFields(log.Fields{"table": tableID, "err": err}).Warn("unable to generate collection schema") continue @@ -604,6 +617,9 @@ func (drv *BatchSQLDriver) Pull(open *pc.Request_Open, stream *boilerplate.PullO } cfg.SetDefaults() + var featureFlags = boilerplate.ParseFeatureFlags(cfg.Advanced.FeatureFlags, featureFlagDefaults) + log.WithField("flags", featureFlags).Info("parsed feature flags") + var db, err = drv.Connect(stream.Context(), &cfg) if err != nil { return err @@ -646,6 +662,7 @@ func (drv *BatchSQLDriver) Pull(open *pc.Request_Open, stream *boilerplate.PullO Bindings: bindings, Output: stream, TranslateValue: drv.TranslateValue, + featureFlags: featureFlags, } return capture.Run(stream.Context()) } @@ -681,6 +698,7 @@ type capture struct { Bindings []bindingInfo Output *boilerplate.PullOutput TranslateValue func(val any, databaseTypeName string) (any, error) + featureFlags map[string]bool } type bindingInfo struct { diff --git a/source-redshift-batch/main.go b/source-redshift-batch/main.go index 2e3d0c2bad..9f7fb9b751 100644 --- a/source-redshift-batch/main.go +++ b/source-redshift-batch/main.go @@ -31,6 +31,7 @@ type advancedConfig struct { PollSchedule string `json:"poll,omitempty" jsonschema:"title=Default Polling Schedule,description=When and how often to execute fetch queries. Accepts a Go duration string like '5m' or '6h' for frequency-based polling or a string like 'daily at 12:34Z' to poll at a specific time (specified in UTC) every day. Defaults to '24h' if unset." jsonschema_extras:"pattern=^([-+]?([0-9]+([.][0-9]+)?(h|m|s|ms))+|daily at [0-9][0-9]?:[0-9]{2}Z)$"` DiscoverSchemas []string `json:"discover_schemas,omitempty" jsonschema:"title=Discovery Schema Selection,description=If this is specified only tables in the selected schema(s) will be automatically discovered. Omit all entries to discover tables from all schemas."` SSLMode string `json:"sslmode,omitempty" jsonschema:"title=SSL Mode,description=Overrides SSL connection behavior by setting the 'sslmode' parameter.,enum=disable,enum=allow,enum=prefer,enum=require,enum=verify-ca,enum=verify-full"` + FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."` } // Validate checks that the configuration possesses all required properties. diff --git a/source-redshift-batch/main_test.go b/source-redshift-batch/main_test.go index 393f60fd11..1f9b060691 100644 --- a/source-redshift-batch/main_test.go +++ b/source-redshift-batch/main_test.go @@ -328,3 +328,21 @@ func snapshotBindings(t testing.TB, bindings []*pf.CaptureSpec_Binding) { } cupaloy.SnapshotT(t, summary.String()) } + +func TestFeatureFlagUseSchemaInference(t *testing.T) { + var ctx, cs = context.Background(), testCaptureSpec(t) + var control = testControlClient(ctx, t) + var uniqueID = "77244729" + var tableName = fmt.Sprintf("test.feature_flag_use_schema_inference_%s", uniqueID) + + // Includes some extra junk to make sure the parsing helper logic is doing its job as intended + cs.EndpointSpec.(*Config).Advanced.FeatureFlags = "this_flag_does_not_exist,use_schema_inference,,,,no_this_flag_also_does_not_exist" + + executeControlQuery(ctx, t, control, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) + t.Cleanup(func() { executeControlQuery(ctx, t, control, fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)) }) + executeControlQuery(ctx, t, control, fmt.Sprintf("CREATE TABLE %s(id INTEGER PRIMARY KEY, data TEXT)", tableName)) + + // Discover the table and verify discovery snapshot + cs.Bindings = discoverStreams(ctx, t, cs, regexp.MustCompile(uniqueID)) + t.Run("Discovery", func(t *testing.T) { snapshotBindings(t, cs.Bindings) }) +}