Skip to content

Commit

Permalink
source-postgres: Drop/recreate slot on backfills
Browse files Browse the repository at this point in the history
This commit adds logic to automatically drop and then recreate
the replication slot when there is no prior replication cursor
to resume from.

This has two effects:
- It avoids situations where the replication slot could be created
  by an initial validation operation but then the capture itself
  doesn't start for hours and has to read through the entire WAL
  between those points before actually reaching events it cares
  about.
- It allows users to recover from replication slot invalidation by
  simply hitting the "Backfill Everything" button, whereas before
  they had to go and drop the slot themselves as well.

This fixes #2001
  • Loading branch information
willdonnelly committed Dec 10, 2024
1 parent a6654f0 commit c611944
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 12 deletions.
24 changes: 24 additions & 0 deletions source-postgres/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/estuary/connectors/sqlcapture"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/sirupsen/logrus"
)

type replicationSlotInfo struct {
Expand Down Expand Up @@ -59,3 +60,26 @@ func listPublishedTables(ctx context.Context, conn *pgx.Conn, publicationName st
}
return publicationStatus, nil
}

// createReplicationSlot attempts to create a new logical replication slot with the specified name.
func createReplicationSlot(ctx context.Context, conn *pgx.Conn, slotName string) error {
var logEntry = logrus.WithField("slot", slotName)
logEntry.Info("attempting to create replication slot")
if _, err := conn.Exec(ctx, fmt.Sprintf(`SELECT pg_create_logical_replication_slot('%s', 'pgoutput');`, slotName)); err != nil {
return fmt.Errorf("replication slot %q couldn't be created", slotName)
}
logEntry.Info("created replication slot")
return nil
}

// recreateReplicationSlot attempts to drop and then recreate a replication slot with the specified name.
func recreateReplicationSlot(ctx context.Context, conn *pgx.Conn, slotName string) error {
var logEntry = logrus.WithField("slot", slotName)
logEntry.Info("attempting to drop replication slot")
if _, err := conn.Exec(ctx, fmt.Sprintf(`SELECT pg_drop_replication_slot('%s');`, slotName)); err != nil {
// Not a fatal error because we don't want a failure to drop a nonexistent slot
// to prevent the subsequent attempt to create it.
logEntry.WithField("err", err).Debug("failed to drop replication slot")
}
return createReplicationSlot(ctx, conn, slotName)
}
12 changes: 4 additions & 8 deletions source-postgres/prerequisites.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,10 @@ func (db *postgresDatabase) prerequisiteReplicationSlot(ctx context.Context) err
)
}

// Slot does not exist in any database. Try to create it.
logEntry.Info("attempting to create replication slot")
if _, err := db.conn.Exec(ctx, fmt.Sprintf(`SELECT pg_create_logical_replication_slot('%s', 'pgoutput');`, slotName)); err != nil {
return fmt.Errorf("replication slot %q doesn't exist and couldn't be created", slotName)
}

logEntry.Info("created replication slot")
return nil
// Slot does not exist in any database. Try to create it (note that it will probably be
// dropped and recreated before replication actually begins, but creating it here is the
// most reliable way to verify that we can do that).
return createReplicationSlot(ctx, db.conn, slotName)
}

func (db *postgresDatabase) prerequisitePublication(ctx context.Context) error {
Expand Down
26 changes: 22 additions & 4 deletions source-postgres/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,42 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s
return nil, fmt.Errorf("unable to connect to database for replication: %w", err)
}

var slot, publication = db.config.Advanced.SlotName, db.config.Advanced.PublicationName

var startLSN pglogrepl.LSN
if startCursor != "" {
startLSN, err = pglogrepl.ParseLSN(startCursor)
if err != nil {
return nil, fmt.Errorf("error parsing start cursor: %w", err)
}
} else {
// If no start cursor is specified, initialize to the current WAL flush position
// obtained via the `IDENTIFY_SYSTEM` command.
// If no start cursor is specified, we are free to begin replication from the latest tail-end of the WAL.

// We begin by dropping and recreating the slot. This avoids situations where the
// 'restart_lsn' is significantly behind the point we actually want to start from,
// and also has the happy side-effect of making it so that merely hitting the
// "Backfill Everything" button in the UI is all that a user has to do to recover
// after replication slot invalidation.
//
// This is always safe to do, because if our start cursor is reset then we don't
// care about any prior state in the replication slot, and if we're able to drop
// it then we also have the necessary permissions to recreate it. Any errors here
// aren't fatal, just to be on the safe side.
if err := recreateReplicationSlot(ctx, db.conn, slot); err != nil {
logrus.WithField("err", err).Debug("error recreating replication slot")
}

// Obtain the current WAL flush location via an IDENTIFY_SYSTEM command.
// Note: We use IDENTIFY_SYSTEM here for legacy reasons, it might be cleaner
// to get this information from the newly-recreated slot's confirmed_flush_lsn
// now that we do it that way.
var sysident, err = pglogrepl.IdentifySystem(ctx, conn)
if err != nil {
return nil, fmt.Errorf("unable to read WAL flush LSN from database: %w", err)
}
startLSN = sysident.XLogPos
}

var slot, publication = db.config.Advanced.SlotName, db.config.Advanced.PublicationName

// Check that the slot's `confirmed_flush_lsn` is less than or equal to our resume cursor value.
// This is necessary because Postgres deliberately allows clients to specify an older start LSN,
// and then ignores that and uses the confirmed LSN instead. Supposedly this simplifies writing
Expand Down

0 comments on commit c611944

Please sign in to comment.