Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-postgres: Support _citext columns #2166

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions source-postgres/.snapshots/TestCIText-Capture
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# ================================
# Collection "acmeCo/test/test_citext_58810479": 6 Documents
# ================================
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"citext_58810479","loc":[11111111,11111111,11111111]}},"arr":{"dimensions":[2],"elements":["a","b"]},"data":"zero","id":0}
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"citext_58810479","loc":[11111111,11111111,11111111]}},"arr":{"dimensions":[2],"elements":["c","d"]},"data":"one","id":1}
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"citext_58810479","loc":[11111111,11111111,11111111]}},"arr":{"dimensions":[2],"elements":["e","f"]},"data":"two","id":2}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"citext_58810479","loc":[11111111,11111111,11111111],"txid":111111}},"arr":{"dimensions":[2],"elements":["g","h"]},"data":"three","id":3}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"citext_58810479","loc":[11111111,11111111,11111111],"txid":111111}},"arr":{"dimensions":[2],"elements":["i","j"]},"data":"four","id":4}
{"_meta":{"op":"c","source":{"ts_ms":1111111111111,"schema":"test","table":"citext_58810479","loc":[11111111,11111111,11111111],"txid":111111}},"arr":{"dimensions":[2],"elements":["k","l"]},"data":"five","id":5}
# ================================
# Final State Checkpoint
# ================================
{"bindingStateV1":{"test%2Fcitext_58810479":{"backfilled":3,"key_columns":["id"],"mode":"Active"}},"cursor":"0/1111111"}

166 changes: 166 additions & 0 deletions source-postgres/.snapshots/TestCIText-Discovery
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
Binding 0:
{
"recommended_name": "test_citext_58810479",
"resource_config_json": {
"namespace": "test",
"stream": "citext_58810479"
},
"document_schema_json": {
"$defs": {
"TestCitext_58810479": {
"type": "object",
"required": [
"id"
],
"$anchor": "TestCitext_58810479",
"properties": {
"arr": {
"type": "object",
"required": [
"dimensions",
"elements"
],
"description": "(source type: _citext)",
"properties": {
"dimensions": {
"items": {
"type": "integer"
},
"type": "array"
},
"elements": {
"items": {
"type": [
"string",
"null"
]
},
"type": "array"
}
},
"type": [
"object",
"null"
]
},
"data": {
"description": "(source type: citext)",
"type": [
"string",
"null"
]
},
"id": {
"type": "integer",
"description": "(source type: non-nullable int4)"
}
}
}
},
"allOf": [
{
"if": {
"properties": {
"_meta": {
"properties": {
"op": {
"const": "d"
}
}
}
}
},
"then": {
"reduce": {
"delete": true,
"strategy": "merge"
}
},
"else": {
"reduce": {
"strategy": "merge"
}
},
"required": [
"_meta"
],
"properties": {
"_meta": {
"type": "object",
"required": [
"op",
"source"
],
"properties": {
"before": {
"$ref": "#TestCitext_58810479",
"description": "Record state immediately before this change was applied.",
"reduce": {
"strategy": "firstWriteWins"
}
},
"op": {
"enum": [
"c",
"d",
"u"
],
"description": "Change operation type: 'c' Create/Insert, 'u' Update, 'd' Delete."
},
"source": {
"$id": "https://github.com/estuary/connectors/source-postgres/postgres-source",
"properties": {
"ts_ms": {
"type": "integer",
"description": "Unix timestamp (in millis) at which this event was recorded by the database."
},
"schema": {
"type": "string",
"description": "Database schema (namespace) of the event."
},
"snapshot": {
"type": "boolean",
"description": "Snapshot is true if the record was produced from an initial table backfill and unset if produced from the replication log."
},
"table": {
"type": "string",
"description": "Database table of the event."
},
"loc": {
"items": {
"type": "integer"
},
"type": "array",
"maxItems": 3,
"minItems": 3,
"description": "Location of this WAL event as [last Commit.EndLSN; event LSN; current Begin.FinalLSN]. See https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html"
},
"txid": {
"type": "integer",
"description": "The 32-bit transaction ID assigned by Postgres to the commit which produced this change."
}
},
"type": "object",
"required": [
"schema",
"table",
"loc"
]
}
},
"reduce": {
"strategy": "merge"
}
}
}
},
{
"$ref": "#TestCitext_58810479"
}
]
},
"key": [
"/id"
]
}

30 changes: 30 additions & 0 deletions source-postgres/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,3 +584,33 @@ func TestDroppedAndRecreatedTable(t *testing.T) {

cupaloy.SnapshotT(t, cs.Summary())
}

func TestCIText(t *testing.T) {
var tb, ctx = postgresTestBackend(t), context.Background()
var uniqueID = "58810479"
var tableDef = "(id INTEGER PRIMARY KEY, data CITEXT, arr CITEXT[])"
var tableName = tb.CreateTable(ctx, t, uniqueID, tableDef)

t.Run("Discovery", func(t *testing.T) {
var cs = tb.CaptureSpec(ctx, t)
cs.VerifyDiscover(ctx, t, regexp.MustCompile(uniqueID))
})

t.Run("Capture", func(t *testing.T) {
var cs = tb.CaptureSpec(ctx, t, regexp.MustCompile(uniqueID))
cs.Validator = &st.OrderedCaptureValidator{}
sqlcapture.TestShutdownAfterCaughtUp = true
t.Cleanup(func() { sqlcapture.TestShutdownAfterCaughtUp = false })

// Initial backfill
tb.Insert(ctx, t, tableName, [][]any{{0, "zero", "{a,b}"}, {1, "one", "{c,d}"}, {2, "two", "{e,f}"}})
cs.Capture(ctx, t, nil)

// Some replication
tb.Insert(ctx, t, tableName, [][]any{{3, "three", "{g,h}"}, {4, "four", "{i,j}"}, {5, "five", "{k,l}"}})
cs.Capture(ctx, t, nil)

// Snapshot
cupaloy.SnapshotT(t, cs.Summary())
})
}
4 changes: 4 additions & 0 deletions source-postgres/datatype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func TestDatatypes(t *testing.T) {
{ColumnType: `tsrange`, ExpectType: `{"type":["string","null"]}`, InputValue: `[2010-01-01 11:30 UTC,2010-01-01 15:00 UTC)`, ExpectValue: `"[2010-01-01T11:30:00Z,2010-01-01T15:00:00Z)"`},
{ColumnType: `tstzrange`, ExpectType: `{"type":["string","null"]}`, InputValue: `[2010-01-01 11:30 UTC,2010-01-01 15:00 UTC)`, ExpectValue: `"[2010-01-01T03:30:00-08:00,2010-01-01T07:00:00-08:00)"`},
{ColumnType: `daterange`, ExpectType: `{"type":["string","null"]}`, InputValue: `(2010-01-01,2010-01-02]`, ExpectValue: `"[2010-01-02T00:00:00Z,2010-01-03T00:00:00Z)"`},

// Extension types
{ColumnType: `citext`, ExpectType: `{"type":["string","null"]}`, InputValue: `Hello, world!`, ExpectValue: `"Hello, world!"`},
{ColumnType: `citext[]`, ExpectType: fmt.Sprintf(arraySchemaPattern, `{"type":["string","null"]}`), InputValue: `{"Hello, world!",asdf}`, ExpectValue: `{"dimensions":[2],"elements":["Hello, world!","asdf"]}`},
})
}

Expand Down
43 changes: 42 additions & 1 deletion source-postgres/datatypes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"database/sql/driver"
"encoding/json"
"errors"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/estuary/connectors/sqlcapture"
"github.com/estuary/flow/go/protocols/fdb/tuple"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
)

Expand All @@ -23,7 +25,7 @@ const (
truncateColumnThreshold = 8 * 1024 * 1024 // Arbitrarily selected value
)

func registerDatatypeTweaks(m *pgtype.Map) error {
func registerDatatypeTweaks(ctx context.Context, conn *pgx.Conn, m *pgtype.Map) error {
// Prefer text format for 'timestamptz' column results. This is important because the
// text format is reported in the configured time zone of the server (since we haven't
// set it on our session) while the binary format is always a Unix microsecond timestamp
Expand Down Expand Up @@ -78,6 +80,45 @@ func registerDatatypeTweaks(m *pgtype.Map) error {
OID: pgtype.JSONBOID,
Codec: &customDecodingCodec{decodeRawJSONB, &pgtype.JSONBCodec{Marshal: json.Marshal, Unmarshal: json.Unmarshal}},
})

// Query pg_catalog.pg_type for specific extension types we support, and install the
// appropriate scalar and array codecs for them.
var queryExtensionTypes = `
SELECT t.oid, n.nspname, t.typname, t.typarray
FROM pg_catalog.pg_type t
JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
WHERE n.nspname = 'public' AND t.typname IN ('citext');
`
var extensionTypes, err = conn.Query(ctx, queryExtensionTypes)
if err != nil {
return fmt.Errorf("error querying extension types: %w", err)
}
defer extensionTypes.Close()
for extensionTypes.Next() {
var typeOID, arrayOID uint32
var schemaName, typeName string
if err := extensionTypes.Scan(&typeOID, &schemaName, &typeName, &arrayOID); err != nil {
return fmt.Errorf("error querying extension types: %w", err)
}

// Map the type name to the appropriate codec. This right here is the bit we'll need to tweak
// if/when we support an extension type that isn't just a glorified string.
//
// But for now 'citext' is the only one, and it's a string, so there's no need making this
// more complicated than it needs to be.
var typeCodec = pgtype.TextCodec{}

// Register mappings for the type and also arrays of that type
var baseType = &pgtype.Type{OID: typeOID, Name: typeName, Codec: typeCodec}
m.RegisterType(baseType)
if arrayOID != 0 {
var arrayCodec = &customDecodingCodec{decodeDimensionedArray, &pgtype.ArrayCodec{ElementType: baseType}}
m.RegisterType(&pgtype.Type{OID: arrayOID, Name: "_" + typeName, Codec: arrayCodec})
}
}
if extensionTypes.Err() != nil {
return fmt.Errorf("error querying extension types: %w", err)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion source-postgres/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ var postgresTypeToJSON = map[string]columnSchema{
"varchar": {jsonTypes: []string{"string"}},
"bpchar": {jsonTypes: []string{"string"}},
"text": {jsonTypes: []string{"string"}},
"citext": {jsonTypes: []string{"string"}}, // From the 'citext' extension so we don't test it by default, but it hurts nothing to support it here
"citext": {jsonTypes: []string{"string"}}, // From the 'citext' extension, but common enough we ought to support it properly
"bytea": {jsonTypes: []string{"string"}, contentEncoding: "base64"},
"xml": {jsonTypes: []string{"string"}},
"bit": {jsonTypes: []string{"string"}},
Expand Down
2 changes: 2 additions & 0 deletions source-postgres/docker-entrypoint-initdb.d/init-user-db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-E

CREATE PUBLICATION flow_publication FOR ALL TABLES;
ALTER PUBLICATION flow_publication SET (publish_via_partition_root = true);

CREATE EXTENSION IF NOT EXISTS citext;
EOSQL
2 changes: 1 addition & 1 deletion source-postgres/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (db *postgresDatabase) connect(ctx context.Context) error {

return fmt.Errorf("unable to connect to database: %w", err)
}
if err := registerDatatypeTweaks(conn.TypeMap()); err != nil {
if err := registerDatatypeTweaks(ctx, conn, conn.TypeMap()); err != nil {
return err
}
db.conn = conn
Expand Down
2 changes: 1 addition & 1 deletion source-postgres/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s
}

var typeMap = pgtype.NewMap()
if err := registerDatatypeTweaks(typeMap); err != nil {
if err := registerDatatypeTweaks(ctx, db.conn, typeMap); err != nil {
return nil, err
}

Expand Down
Loading