Skip to content

Commit

Permalink
Merge pull request #761 from twmb/746
Browse files Browse the repository at this point in the history
kgo sink: do not back off on certain edge case
  • Loading branch information
twmb authored Jul 29, 2024
2 parents c5c7357 + e62b402 commit b44e16e
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,24 @@ func (s *sink) handleRetryBatches(
return
}

// If the request failed due to a concurrent metadata update
// moving partitions to a different sink (or killing the sink
// this partition was on), we can just reset the drain index
// and trigger draining now the new sink. There is no reason
// to backoff on this sink nor trigger a metadata update.
if batch.owner.sink != s {
if debug {
logger.Log(LogLevelDebug, "transitioned sinks while a request was inflight, retrying immediately on new sink without backoff",
"topic", batch.owner.topic,
"partition", batch.owner.partition,
"old_sink", s.nodeID,
"new_sink", batch.owner.sink.nodeID,
)
}
batch.owner.resetBatchDrainIdx()
return
}

if canFail || s.cl.cfg.disableIdempotency {
if err := batch.maybeFailErr(&s.cl.cfg); err != nil {
batch.owner.failAllRecords(err)
Expand Down Expand Up @@ -1003,6 +1021,8 @@ func (s *sink) handleRetryBatches(
// If neither of these cases are true, then we entered wanting a
// metadata update, but the batches either were not the first batch, or
// the batches were concurrently failed.
//
// If all partitions are moving, we do not need to backoff nor drain.
if shouldBackoff || (!updateMeta && numRetryBatches != numMoveBatches) {
s.maybeTriggerBackoff(backoffSeq)
s.maybeDrain()
Expand Down

0 comments on commit b44e16e

Please sign in to comment.