Skip to content

Commit

Permalink
Send keepalive messages in split decoder periodically to avoid wal re…
Browse files Browse the repository at this point in the history
…ceiver timeouts during large shard splits. (#7229)

DESCRIPTION: Send keepalive messages during the logical replication
phase of large shard splits to avoid timeouts.

During the logical replication part of the shard split process, split
decoder filters out the wal records produced by the initial copy. If the
number of wal records is big, then split decoder ends up processing for
a long time before sending out any wal records through pgoutput. Hence
the wal receiver may time out and restarts repeatedly causing our split
driver code catch up logic to fail.

Notes: 

1. If the wal_receiver_timeout is set to a very small number e.g. 600ms,
it may time out before receiving the keepalives. My tests show that this
code works best when the` wal_receiver_timeout `is set to 1minute, which
is the default value.

2. Once a logical replication worker time outs, a new one gets launched.
The new logical replication worker sets the pg_stat_subscription columns
to initial values. E.g. the latest_end_lsn is set to 0. Our driver logic
in `WaitForGroupedLogicalRepTargetsToCatchUp` can not handle LSN value
to go back. This is the main reason for it to get stuck in the infinite
loop.
  • Loading branch information
emelsimsek authored and francisjodi committed Nov 13, 2023
1 parent 8ada055 commit bd77e82
Showing 1 changed file with 46 additions and 0 deletions.
46 changes: 46 additions & 0 deletions src/backend/distributed/shardsplit/shardsplit_decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,46 @@ replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id)
}


/*
* update_replication_progress is copied from Postgres 15. We use it to send keepalive
* messages when we are filtering out the wal changes resulting from the initial copy.
* If we do not send out messages long enough, wal reciever will time out.
* Postgres 16 has refactored this code such that keepalive messages are sent during
* reordering phase which is above change_cb. So we do not need to send keepalive in
* change_cb.
*/
#if (PG_VERSION_NUM < PG_VERSION_16)
static void
update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
{
static int changes_count = 0;

/*
* We don't want to try sending a keepalive message after processing each
* change as that can have overhead. Tests revealed that there is no
* noticeable overhead in doing it after continuously processing 100 or so
* changes.
*/
#define CHANGES_THRESHOLD 100

/*
* After continuously processing CHANGES_THRESHOLD changes, we
* try to send a keepalive message if required.
*/
if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
{
#if (PG_VERSION_NUM >= PG_VERSION_15)
OutputPluginUpdateProgress(ctx, skipped_xact);
#else
OutputPluginUpdateProgress(ctx);
#endif
changes_count = 0;
}
}


#endif

/*
* shard_split_change_cb function emits the incoming tuple change
* to the appropriate destination shard.
Expand All @@ -108,6 +148,12 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
return;
}

#if (PG_VERSION_NUM < PG_VERSION_16)

/* Send replication keepalive. */
update_replication_progress(ctx, false);
#endif

/* check if the relation is publishable.*/
if (!is_publishable_relation(relation))
{
Expand Down

0 comments on commit bd77e82

Please sign in to comment.