diff --git a/source-postgres/database.go b/source-postgres/database.go index fa905553a..fe447c869 100644 --- a/source-postgres/database.go +++ b/source-postgres/database.go @@ -8,6 +8,7 @@ import ( "github.com/estuary/connectors/sqlcapture" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" + "github.com/sirupsen/logrus" ) type replicationSlotInfo struct { @@ -59,3 +60,26 @@ func listPublishedTables(ctx context.Context, conn *pgx.Conn, publicationName st } return publicationStatus, nil } + +// createReplicationSlot attempts to create a new logical replication slot with the specified name. +func createReplicationSlot(ctx context.Context, conn *pgx.Conn, slotName string) error { + var logEntry = logrus.WithField("slot", slotName) + logEntry.Info("attempting to create replication slot") + if _, err := conn.Exec(ctx, fmt.Sprintf(`SELECT pg_create_logical_replication_slot('%s', 'pgoutput');`, slotName)); err != nil { + return fmt.Errorf("replication slot %q couldn't be created", slotName) + } + logEntry.Info("created replication slot") + return nil +} + +// recreateReplicationSlot attempts to drop and then recreate a replication slot with the specified name. +func recreateReplicationSlot(ctx context.Context, conn *pgx.Conn, slotName string) error { + var logEntry = logrus.WithField("slot", slotName) + logEntry.Info("attempting to drop replication slot") + if _, err := conn.Exec(ctx, fmt.Sprintf(`SELECT pg_drop_replication_slot('%s');`, slotName)); err != nil { + // Not a fatal error because we don't want a failure to drop a nonexistent slot + // to prevent the subsequent attempt to create it. + logEntry.WithField("err", err).Debug("failed to drop replication slot") + } + return createReplicationSlot(ctx, conn, slotName) +} diff --git a/source-postgres/prerequisites.go b/source-postgres/prerequisites.go index 5c263642a..2fc70a605 100644 --- a/source-postgres/prerequisites.go +++ b/source-postgres/prerequisites.go @@ -142,14 +142,10 @@ func (db *postgresDatabase) prerequisiteReplicationSlot(ctx context.Context) err ) } - // Slot does not exist in any database. Try to create it. - logEntry.Info("attempting to create replication slot") - if _, err := db.conn.Exec(ctx, fmt.Sprintf(`SELECT pg_create_logical_replication_slot('%s', 'pgoutput');`, slotName)); err != nil { - return fmt.Errorf("replication slot %q doesn't exist and couldn't be created", slotName) - } - - logEntry.Info("created replication slot") - return nil + // Slot does not exist in any database. Try to create it (note that it will probably be + // dropped and recreated before replication actually begins, but creating it here is the + // most reliable way to verify that we can do that). + return createReplicationSlot(ctx, db.conn, slotName) } func (db *postgresDatabase) prerequisitePublication(ctx context.Context) error { diff --git a/source-postgres/replication.go b/source-postgres/replication.go index 5be5fd753..b7a509e76 100644 --- a/source-postgres/replication.go +++ b/source-postgres/replication.go @@ -37,6 +37,8 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s return nil, fmt.Errorf("unable to connect to database for replication: %w", err) } + var slot, publication = db.config.Advanced.SlotName, db.config.Advanced.PublicationName + var startLSN pglogrepl.LSN if startCursor != "" { startLSN, err = pglogrepl.ParseLSN(startCursor) @@ -44,8 +46,26 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s return nil, fmt.Errorf("error parsing start cursor: %w", err) } } else { - // If no start cursor is specified, initialize to the current WAL flush position - // obtained via the `IDENTIFY_SYSTEM` command. + // If no start cursor is specified, we are free to begin replication from the latest tail-end of the WAL. + + // We begin by dropping and recreating the slot. This avoids situations where the + // 'restart_lsn' is significantly behind the point we actually want to start from, + // and also has the happy side-effect of making it so that merely hitting the + // "Backfill Everything" button in the UI is all that a user has to do to recover + // after replication slot invalidation. + // + // This is always safe to do, because if our start cursor is reset then we don't + // care about any prior state in the replication slot, and if we're able to drop + // it then we also have the necessary permissions to recreate it. Any errors here + // aren't fatal, just to be on the safe side. + if err := recreateReplicationSlot(ctx, db.conn, slot); err != nil { + logrus.WithField("err", err).Debug("error recreating replication slot") + } + + // Obtain the current WAL flush location via an IDENTIFY_SYSTEM command. + // Note: We use IDENTIFY_SYSTEM here for legacy reasons, it might be cleaner + // to get this information from the newly-recreated slot's confirmed_flush_lsn + // now that we do it that way. var sysident, err = pglogrepl.IdentifySystem(ctx, conn) if err != nil { return nil, fmt.Errorf("unable to read WAL flush LSN from database: %w", err) @@ -53,8 +73,6 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s startLSN = sysident.XLogPos } - var slot, publication = db.config.Advanced.SlotName, db.config.Advanced.PublicationName - // Check that the slot's `confirmed_flush_lsn` is less than or equal to our resume cursor value. // This is necessary because Postgres deliberately allows clients to specify an older start LSN, // and then ignores that and uses the confirmed LSN instead. Supposedly this simplifies writing