diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 8c1349a..5c5d950 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -15,12 +15,14 @@ func Test_batchConsumer_startBatch(t *testing.T) { // Given var numberOfBatch int + mc := mockReader{} bc := batchConsumer{ base: &base{ messageCh: make(chan *Message), metric: &ConsumerMetric{}, wg: sync.WaitGroup{}, messageGroupDuration: 500 * time.Millisecond, + r: &mc, }, messageGroupLimit: 3, consumeFn: func(messages []*Message) error { diff --git a/consumer_base_test.go b/consumer_base_test.go index 5e22956..63ef367 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -92,9 +92,15 @@ func (m *mockReader) FetchMessage(ctx context.Context) (*kafka.Message, error) { } func (m *mockReader) Close() error { - panic("implement me") + if m.wantErr { + return errors.New("err") + } + return nil } func (m *mockReader) CommitMessages(messages []kafka.Message) error { - panic("implement me") + if m.wantErr { + return errors.New("err") + } + return nil }