From 89f3edd0ed6b18aaf8f3999c51db53884d2e6a5b Mon Sep 17 00:00:00 2001 From: nachogiljaldo Date: Fri, 15 Nov 2024 21:56:18 +0100 Subject: [PATCH] Increase test coverage for commit function. --- reader.go | 6 +++- reader_test.go | 83 +++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 74 insertions(+), 15 deletions(-) diff --git a/reader.go b/reader.go index 010c9512..70855e60 100644 --- a/reader.go +++ b/reader.go @@ -184,6 +184,7 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash offsetStash.removeGenerationID(generationID) illegalGenerationErr = true err = nil + break } } } @@ -228,11 +229,14 @@ func (o offsetStash) reset() { } func (o offsetStash) removeGenerationID(genID int32) { - for _, offsetsForTopic := range o { + for topic, offsetsForTopic := range o { for partition, offsetsForPartition := range offsetsForTopic { if offsetsForPartition.generationID == genID { delete(offsetsForTopic, partition) } + if len(offsetsForTopic) == 0 { + delete(o, topic) + } } } } diff --git a/reader_test.go b/reader_test.go index f48269d5..dd998273 100644 --- a/reader_test.go +++ b/reader_test.go @@ -1451,36 +1451,87 @@ func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) { } func TestCommitOffsetsWithRetry(t *testing.T) { - offsets := offsetStash{"topic": {0: {0, 1}}} + offsets := func() offsetStash { + return offsetStash{"topic": {0: {0, 1}}} + } tests := map[string]struct { - Fails int - Invocations int - HasError bool + Fails int + Invocations int + HasError bool + Error error + Offsets offsetStash + Config ReaderConfig + ExpectedOffsets offsetStash }{ "happy path": { - Invocations: 1, + Invocations: 1, + Error: io.EOF, + Offsets: offsets(), + ExpectedOffsets: offsets(), }, "1 retry": { - Fails: 1, - Invocations: 2, + Fails: 1, + Invocations: 2, + Error: io.EOF, + Offsets: offsets(), + ExpectedOffsets: offsets(), }, "out of retries": { - Fails: defaultCommitRetries + 1, - Invocations: defaultCommitRetries, - HasError: true, + Fails: defaultCommitRetries + 1, + Invocations: defaultCommitRetries, + HasError: true, + Error: io.EOF, + Offsets: offsets(), + ExpectedOffsets: offsets(), + }, + "illegal generation error only 1 generation": { + Fails: 1, + Invocations: 1, + Error: IllegalGeneration, + Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 1}}}, + ExpectedOffsets: offsetStash{}, + Config: ReaderConfig{ErrorOnWrongGenerationCommit: false}, + }, + "illegal generation error only 2 generations": { + Fails: 1, + Invocations: 1, + Error: IllegalGeneration, + Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 2}}}, + ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}}, + Config: ReaderConfig{ErrorOnWrongGenerationCommit: false}, + }, + "illegal generation error only 1 generation - error propagation": { + Fails: 1, + Invocations: 1, + Error: IllegalGeneration, + Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 1}}}, + ExpectedOffsets: offsetStash{}, + Config: ReaderConfig{ErrorOnWrongGenerationCommit: true}, + HasError: true, + }, + "illegal generation error only 2 generations - error propagation": { + Fails: 1, + Invocations: 1, + Error: IllegalGeneration, + Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 2}}}, + ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}}, + Config: ReaderConfig{ErrorOnWrongGenerationCommit: true}, + HasError: true, }, } for label, test := range tests { t.Run(label, func(t *testing.T) { + requests := make([]offsetCommitRequestV2, 0) count := 0 gen := &Generation{ conn: mockCoordinator{ - offsetCommitFunc: func(offsetCommitRequestV2) (offsetCommitResponseV2, error) { + offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) { + requests = append(requests, r) count++ if count <= test.Fails { - return offsetCommitResponseV2{}, io.EOF + return offsetCommitResponseV2{}, test.Error } return offsetCommitResponseV2{}, nil }, @@ -1491,13 +1542,17 @@ func TestCommitOffsetsWithRetry(t *testing.T) { Assignments: map[string][]PartitionAssignment{"topic": {{0, 1}}}, } - r := &Reader{stctx: context.Background()} - err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries) + r := &Reader{stctx: context.Background(), config: test.Config} + err := r.commitOffsetsWithRetry(gen, test.Offsets, defaultCommitRetries) switch { case test.HasError && err == nil: t.Error("bad err: expected not nil; got nil") case !test.HasError && err != nil: t.Errorf("bad err: expected nil; got %v", err) + default: + if !reflect.DeepEqual(test.ExpectedOffsets, test.Offsets) { + t.Errorf("bad expected offsets: expected %+v; got %v", test.ExpectedOffsets, test.Offsets) + } } }) }