Skip to content

Commit

Permalink
Merge pull request #786 from twmb/785
Browse files Browse the repository at this point in the history
kgo sink: fix read/write race for recBatch.canFailFromLoadErrs
  • Loading branch information
twmb authored Jul 29, 2024
2 parents e16c46c + 1827add commit 4e14d75
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,8 @@ func (s *sink) handleReqRespBatch(

// Since we have received a response and we are the first batch, we can
// at this point re-enable failing from load errors.
//
// We do not need a lock since the owner is locked.
batch.canFailFromLoadErrs = true

// By default, we assume we errored. Non-error updates this back
Expand Down Expand Up @@ -1314,6 +1316,10 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
batch0 := recBuf.batches[0]
batch0.tries++

// We need to lock the batch as well because there could be a buffered
// request about to be written. Writing requests only grabs the batch
// mu, not the recBuf mu.
batch0.mu.Lock()
var (
canFail = !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs // we can only fail if we are not idempotent or if we have no outstanding requests
batch0Fail = batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting
Expand All @@ -1323,6 +1329,8 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {

willFail = canFail && (batch0Fail || !netErr && (!retryableKerr || retryableKerr && isUnknownLimit))
)
batch0.isFailingFromLoadErr = willFail
batch0.mu.Unlock()

recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch",
"broker", logID(recBuf.sink.nodeID),
Expand All @@ -1336,6 +1344,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
"is_unknown_limit", isUnknownLimit,
"will_fail", willFail,
)

if willFail {
recBuf.failAllRecords(err)
}
Expand Down Expand Up @@ -1426,6 +1435,10 @@ type recBatch struct {
// request with this batch, and then reset it to true whenever we
// process a response.
canFailFromLoadErrs bool
// If we are going to fail the batch in bumpRepeatedLoadErr, we need to
// set this bool to true. There could be a concurrent request about to
// be written. See more comments below where this is used.
isFailingFromLoadErr bool

wireLength int32 // tracks total size this batch would currently encode as, including length prefix
v1wireLength int32 // same as wireLength, but for message set v1
Expand Down Expand Up @@ -1978,7 +1991,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte {
for partition, batch := range partitions {
dst = kbin.AppendInt32(dst, partition)
batch.mu.Lock()
if batch.records == nil { // concurrent failAllRecords
if batch.records == nil || batch.isFailingFromLoadErr { // concurrent failAllRecords OR concurrent bumpRepeatedLoadErr
if flexible {
dst = kbin.AppendCompactNullableBytes(dst, nil)
} else {
Expand Down

0 comments on commit 4e14d75

Please sign in to comment.