Skip to content

Commit

Permalink
kgo sink: fix read/write race for recBatch.canFailFromLoadErrs
Browse files Browse the repository at this point in the history
When writing a record batch during a request, the batch mutex is locked.
This guards against a concurrent failAllRecords, which can be triggered
from a metadata update.

However, a boolean field that guarded against failing buffered records
if it's not "safe" was not properly mutex guarded. Writing a request
only locks the batch, not the owning recBuf, while checking to see if
the batch could fail only locked the owning recBuf, not the batch.

This adds locking around the batch when checking if it can be failed,
and adds a bool that, if true (due to load failures), ensures the batch
is not written.

Closes #785.
  • Loading branch information
twmb committed Jul 20, 2024
1 parent a5f2b71 commit 1827add
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,8 @@ func (s *sink) handleReqRespBatch(

// Since we have received a response and we are the first batch, we can
// at this point re-enable failing from load errors.
//
// We do not need a lock since the owner is locked.
batch.canFailFromLoadErrs = true

// By default, we assume we errored. Non-error updates this back
Expand Down Expand Up @@ -1294,6 +1296,10 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
batch0 := recBuf.batches[0]
batch0.tries++

// We need to lock the batch as well because there could be a buffered
// request about to be written. Writing requests only grabs the batch
// mu, not the recBuf mu.
batch0.mu.Lock()
var (
canFail = !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs // we can only fail if we are not idempotent or if we have no outstanding requests
batch0Fail = batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting
Expand All @@ -1303,6 +1309,8 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {

willFail = canFail && (batch0Fail || !netErr && (!retryableKerr || retryableKerr && isUnknownLimit))
)
batch0.isFailingFromLoadErr = willFail
batch0.mu.Unlock()

recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch",
"broker", logID(recBuf.sink.nodeID),
Expand All @@ -1316,6 +1324,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
"is_unknown_limit", isUnknownLimit,
"will_fail", willFail,
)

if willFail {
recBuf.failAllRecords(err)
}
Expand Down Expand Up @@ -1406,6 +1415,10 @@ type recBatch struct {
// request with this batch, and then reset it to true whenever we
// process a response.
canFailFromLoadErrs bool
// If we are going to fail the batch in bumpRepeatedLoadErr, we need to
// set this bool to true. There could be a concurrent request about to
// be written. See more comments below where this is used.
isFailingFromLoadErr bool

wireLength int32 // tracks total size this batch would currently encode as, including length prefix
v1wireLength int32 // same as wireLength, but for message set v1
Expand Down Expand Up @@ -1958,7 +1971,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte {
for partition, batch := range partitions {
dst = kbin.AppendInt32(dst, partition)
batch.mu.Lock()
if batch.records == nil { // concurrent failAllRecords
if batch.records == nil || batch.isFailingFromLoadErr { // concurrent failAllRecords OR concurrent bumpRepeatedLoadErr
if flexible {
dst = kbin.AppendCompactNullableBytes(dst, nil)
} else {
Expand Down

0 comments on commit 1827add

Please sign in to comment.