From c611944ae8a13e957eb5b93d3fdf3793d057c131 Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Fri, 6 Dec 2024 13:08:42 -0600 Subject: [PATCH] source-postgres: Drop/recreate slot on backfills This commit adds logic to automatically drop and then recreate the replication slot when there is no prior replication cursor to resume from. This has two effects: - It avoids situations where the replication slot could be created by an initial validation operation but then the capture itself doesn't start for hours and has to read through the entire WAL between those points before actually reaching events it cares about. - It allows users to recover from replication slot invalidation by simply hitting the "Backfill Everything" button, whereas before they had to go and drop the slot themselves as well. This fixes https://github.com/estuary/connectors/issues/2001 --- source-postgres/database.go | 24 ++++++++++++++++++++++++ source-postgres/prerequisites.go | 12 ++++-------- source-postgres/replication.go | 26 ++++++++++++++++++++++---- 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/source-postgres/database.go b/source-postgres/database.go index fa905553a1..fe447c8693 100644 --- a/source-postgres/database.go +++ b/source-postgres/database.go @@ -8,6 +8,7 @@ import ( "github.com/estuary/connectors/sqlcapture" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" + "github.com/sirupsen/logrus" ) type replicationSlotInfo struct { @@ -59,3 +60,26 @@ func listPublishedTables(ctx context.Context, conn *pgx.Conn, publicationName st } return publicationStatus, nil } + +// createReplicationSlot attempts to create a new logical replication slot with the specified name. +func createReplicationSlot(ctx context.Context, conn *pgx.Conn, slotName string) error { + var logEntry = logrus.WithField("slot", slotName) + logEntry.Info("attempting to create replication slot") + if _, err := conn.Exec(ctx, fmt.Sprintf(`SELECT pg_create_logical_replication_slot('%s', 'pgoutput');`, slotName)); err != nil { + return fmt.Errorf("replication slot %q couldn't be created", slotName) + } + logEntry.Info("created replication slot") + return nil +} + +// recreateReplicationSlot attempts to drop and then recreate a replication slot with the specified name. +func recreateReplicationSlot(ctx context.Context, conn *pgx.Conn, slotName string) error { + var logEntry = logrus.WithField("slot", slotName) + logEntry.Info("attempting to drop replication slot") + if _, err := conn.Exec(ctx, fmt.Sprintf(`SELECT pg_drop_replication_slot('%s');`, slotName)); err != nil { + // Not a fatal error because we don't want a failure to drop a nonexistent slot + // to prevent the subsequent attempt to create it. + logEntry.WithField("err", err).Debug("failed to drop replication slot") + } + return createReplicationSlot(ctx, conn, slotName) +} diff --git a/source-postgres/prerequisites.go b/source-postgres/prerequisites.go index 5c263642a1..2fc70a6055 100644 --- a/source-postgres/prerequisites.go +++ b/source-postgres/prerequisites.go @@ -142,14 +142,10 @@ func (db *postgresDatabase) prerequisiteReplicationSlot(ctx context.Context) err ) } - // Slot does not exist in any database. Try to create it. - logEntry.Info("attempting to create replication slot") - if _, err := db.conn.Exec(ctx, fmt.Sprintf(`SELECT pg_create_logical_replication_slot('%s', 'pgoutput');`, slotName)); err != nil { - return fmt.Errorf("replication slot %q doesn't exist and couldn't be created", slotName) - } - - logEntry.Info("created replication slot") - return nil + // Slot does not exist in any database. Try to create it (note that it will probably be + // dropped and recreated before replication actually begins, but creating it here is the + // most reliable way to verify that we can do that). + return createReplicationSlot(ctx, db.conn, slotName) } func (db *postgresDatabase) prerequisitePublication(ctx context.Context) error { diff --git a/source-postgres/replication.go b/source-postgres/replication.go index 5be5fd7530..b7a509e76b 100644 --- a/source-postgres/replication.go +++ b/source-postgres/replication.go @@ -37,6 +37,8 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s return nil, fmt.Errorf("unable to connect to database for replication: %w", err) } + var slot, publication = db.config.Advanced.SlotName, db.config.Advanced.PublicationName + var startLSN pglogrepl.LSN if startCursor != "" { startLSN, err = pglogrepl.ParseLSN(startCursor) @@ -44,8 +46,26 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s return nil, fmt.Errorf("error parsing start cursor: %w", err) } } else { - // If no start cursor is specified, initialize to the current WAL flush position - // obtained via the `IDENTIFY_SYSTEM` command. + // If no start cursor is specified, we are free to begin replication from the latest tail-end of the WAL. + + // We begin by dropping and recreating the slot. This avoids situations where the + // 'restart_lsn' is significantly behind the point we actually want to start from, + // and also has the happy side-effect of making it so that merely hitting the + // "Backfill Everything" button in the UI is all that a user has to do to recover + // after replication slot invalidation. + // + // This is always safe to do, because if our start cursor is reset then we don't + // care about any prior state in the replication slot, and if we're able to drop + // it then we also have the necessary permissions to recreate it. Any errors here + // aren't fatal, just to be on the safe side. + if err := recreateReplicationSlot(ctx, db.conn, slot); err != nil { + logrus.WithField("err", err).Debug("error recreating replication slot") + } + + // Obtain the current WAL flush location via an IDENTIFY_SYSTEM command. + // Note: We use IDENTIFY_SYSTEM here for legacy reasons, it might be cleaner + // to get this information from the newly-recreated slot's confirmed_flush_lsn + // now that we do it that way. var sysident, err = pglogrepl.IdentifySystem(ctx, conn) if err != nil { return nil, fmt.Errorf("unable to read WAL flush LSN from database: %w", err) @@ -53,8 +73,6 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s startLSN = sysident.XLogPos } - var slot, publication = db.config.Advanced.SlotName, db.config.Advanced.PublicationName - // Check that the slot's `confirmed_flush_lsn` is less than or equal to our resume cursor value. // This is necessary because Postgres deliberately allows clients to specify an older start LSN, // and then ignores that and uses the confirmed LSN instead. Supposedly this simplifies writing