Skip to content

Commit

Permalink
kgo sink: do not back off on certain edge case
Browse files Browse the repository at this point in the history
* Produce request created and about to be issued
* Metadata request resolves and removes the broker that was about to be sent to, updates leadership for the partition
* recBuf's `sink` field is updated
* The old sink then enters handleReqResp, then eventually handleRetryBatches

Previously,
* Failed partition triggers a metadata refresh and enters a failed state
  until the metadata refresh clears the failing state. Because a
  metadata refresh JUST happened, internally this causes a 5s wait by
  default

Now,
* Failed partition notices that it is actually NOW on a different broker
  than the broker that is handling the failure, and does not back off at
  all, and actually triggers potentially draining on the new sink once
  decInflight runs

Closes #746.
  • Loading branch information
twmb committed Jun 10, 2024
1 parent 40589af commit e62b402
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,24 @@ func (s *sink) handleRetryBatches(
return
}

// If the request failed due to a concurrent metadata update
// moving partitions to a different sink (or killing the sink
// this partition was on), we can just reset the drain index
// and trigger draining now the new sink. There is no reason
// to backoff on this sink nor trigger a metadata update.
if batch.owner.sink != s {
if debug {
logger.Log(LogLevelDebug, "transitioned sinks while a request was inflight, retrying immediately on new sink without backoff",
"topic", batch.owner.topic,
"partition", batch.owner.partition,
"old_sink", s.nodeID,
"new_sink", batch.owner.sink.nodeID,
)
}
batch.owner.resetBatchDrainIdx()
return
}

if canFail || s.cl.cfg.disableIdempotency {
if err := batch.maybeFailErr(&s.cl.cfg); err != nil {
batch.owner.failAllRecords(err)
Expand Down Expand Up @@ -1003,6 +1021,8 @@ func (s *sink) handleRetryBatches(
// If neither of these cases are true, then we entered wanting a
// metadata update, but the batches either were not the first batch, or
// the batches were concurrently failed.
//
// If all partitions are moving, we do not need to backoff nor drain.
if shouldBackoff || (!updateMeta && numRetryBatches != numMoveBatches) {
s.maybeTriggerBackoff(backoffSeq)
s.maybeDrain()
Expand Down

0 comments on commit e62b402

Please sign in to comment.