Skip to content

Commit

Permalink
test: add startConsume test in the base consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Sep 17, 2023
1 parent 12bf4be commit e89bb71
Showing 1 changed file with 91 additions and 0 deletions.
91 changes: 91 additions & 0 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package kafka

import (
"bytes"
"context"
"errors"
"github.com/segmentio/kafka-go"
"sync"
"testing"
)

func Test_base_startConsume(t *testing.T) {
t.Run("Return_When_Quit_Signal_Is_Came", func(t *testing.T) {
mc := mockReader{wantErr: true}
b := base{wg: sync.WaitGroup{}, r: &mc,
messageCh: make(chan Message), quit: make(chan struct{}),
logger: NewZapLogger(LogLevelDebug),
}
b.context, b.cancelFn = context.WithCancel(context.Background())

b.wg.Add(1)

// When
go b.startConsume()
b.quit <- struct{}{}

// Then
// Ensure quit called, it works because defer wg.Done statement
b.wg.Wait()
})
t.Run("Read_Incoming_Messages_Successfully", func(t *testing.T) {
// Given
mc := mockReader{}
b := base{wg: sync.WaitGroup{}, r: &mc, messageCh: make(chan Message)}
b.wg.Add(1)

// When
go b.startConsume()

actual := <-b.messageCh
// Then
expected := kafka.Message{Topic: "topic", Partition: 0, Offset: 1, HighWaterMark: 1, Key: []byte("foo"), Value: []byte("bar"), Headers: []kafka.Header{{Key: "header", Value: []byte("value")}}}

actualHeader := actual.Headers[0]
expectedHeader := expected.Headers[0]

if actual.Topic != expected.Topic {
t.Errorf("Expected: %s, Actual: %s", expected.Topic, actual.Topic)
}
if actual.Partition != expected.Partition {
t.Errorf("Expected: %d, Actual: %d", expected.Partition, actual.Partition)
}
if actual.Offset != expected.Offset {
t.Errorf("Expected: %d, Actual: %d", expected.Offset, actual.Offset)
}
if actual.HighWaterMark != expected.HighWaterMark {
t.Errorf("Expected: %d, Actual: %d", expected.HighWaterMark, actual.HighWaterMark)
}
if !bytes.Equal(actual.Key, expected.Key) {
t.Errorf("Expected: %s, Actual: %s", expected.Value, actual.Value)
}
if !bytes.Equal(actual.Value, expected.Value) {
t.Errorf("Expected: %s, Actual: %s", expected.Value, actual.Value)
}
if actualHeader.Key != expectedHeader.Key {
t.Errorf("Expected: %s, Actual: %s", actualHeader.Key, expectedHeader.Key)
}
if !bytes.Equal(actualHeader.Value, expectedHeader.Value) {
t.Errorf("Expected: %s, Actual: %s", expectedHeader.Value, expectedHeader.Value)
}
if actual.Time != expected.Time {
t.Errorf("Expected: %s, Actual: %s", expected.Value, actual.Value)
}
})

}

type mockReader struct {
wantErr bool
}

func (m *mockReader) ReadMessage(ctx context.Context) (*kafka.Message, error) {
if m.wantErr {
return nil, errors.New("err")
}
return &kafka.Message{Topic: "topic", Partition: 0, Offset: 1, HighWaterMark: 1, Key: []byte("foo"), Value: []byte("bar"), Headers: []kafka.Header{{Key: "header", Value: []byte("value")}}}, nil
}

func (m *mockReader) Close() error {
panic("implement me")
}

0 comments on commit e89bb71

Please sign in to comment.