Skip to content

Commit

Permalink
Merge pull request #787 from twmb/777
Browse files Browse the repository at this point in the history
kgo: fix deadlock in Produce when using MaxBufferedBytes
  • Loading branch information
twmb authored Jul 29, 2024
2 parents 4e14d75 + 9e32897 commit 718591a
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 74 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/lint-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ jobs:

integration-test-kafka:
if: github.repository == 'twmb/franz-go'
needs: golangci
runs-on: ubuntu-latest
name: "integration test kafka"
container: golang:latest
Expand All @@ -55,15 +54,14 @@ jobs:
KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded UUID
steps:
- uses: actions/checkout@v4
- run: go test ./...
- run: go test -timeout 5m ./...
env:
KGO_TEST_RF: 1
KGO_SEEDS: kafka:9092
KGO_TEST_STABLE_FETCH: true

integration-test-redpanda:
if: github.repository == 'twmb/franz-go'
needs: golangci
runs-on: ubuntu-latest
name: "integration test redpanda"
container: golang:latest
Expand All @@ -76,7 +74,7 @@ jobs:
REDPANDA_ADVERTISE_KAFKA_ADDRESS: redpanda:9092
steps:
- uses: actions/checkout@v4
- run: go test ./...
- run: go test -timeout 5m ./...
env:
KGO_TEST_RF: 1
KGO_SEEDS: redpanda:9092
6 changes: 4 additions & 2 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ func TestPauseIssue489(t *testing.T) {
exit.Store(true)
}
})
time.Sleep(100 * time.Microsecond)
cl.Flush(ctx)
time.Sleep(50 * time.Microsecond)
}
}()
defer cancel()
Expand Down Expand Up @@ -416,7 +417,8 @@ func TestPauseIssueOct2023(t *testing.T) {
exit.Store(true)
}
})
time.Sleep(100 * time.Microsecond)
cl.Flush(ctx)
time.Sleep(50 * time.Microsecond)
}
}()
defer cancel()
Expand Down
62 changes: 62 additions & 0 deletions pkg/kgo/produce_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,75 @@ package kgo

import (
"bytes"
"context"
"errors"
"hash/crc32"
"math/rand"
"strings"
"sync"
"sync/atomic"
"testing"

"github.com/twmb/franz-go/pkg/kbin"
"github.com/twmb/franz-go/pkg/kmsg"
)

func TestClient_Produce(t *testing.T) {
var (
topic, cleanup = tmpTopicPartitions(t, 1)
numWorkers = 50
recsToWrite = int64(20_000)

workers sync.WaitGroup
writeSuccess atomic.Int64
writeFailure atomic.Int64

randRec = func() *Record {
return &Record{
Key: []byte("test"),
Value: []byte(strings.Repeat("x", rand.Intn(1000))),
Topic: topic,
}
}
)
defer cleanup()

cl, _ := newTestClient(MaxBufferedBytes(5000))
defer cl.Close()

// Start N workers that will concurrently write to the same partition.
var recsWritten atomic.Int64
var fatal atomic.Bool
for i := 0; i < numWorkers; i++ {
workers.Add(1)

go func() {
defer workers.Done()

for recsWritten.Add(1) <= recsToWrite {
res := cl.ProduceSync(context.Background(), randRec())
if err := res.FirstErr(); err == nil {
writeSuccess.Add(1)
} else {
if !errors.Is(err, ErrMaxBuffered) {
t.Errorf("unexpected error: %v", err)
fatal.Store(true)
}

writeFailure.Add(1)
}
}
}()
}
workers.Wait()

t.Logf("writes succeeded: %d", writeSuccess.Load())
t.Logf("writes failed: %d", writeFailure.Load())
if fatal.Load() {
t.Fatal("failed")
}
}

// This file contains golden tests against kmsg AppendTo's to ensure our custom
// encoding is correct.

Expand Down
Loading

0 comments on commit 718591a

Please sign in to comment.