Skip to content

Commit

Permalink
test: add producer tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Sep 17, 2023
1 parent 5ef3216 commit bb483ec
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 2 deletions.
7 changes: 6 additions & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ type Producer interface {
Close() error
}

type Writer interface {
WriteMessages(context.Context, ...kafka.Message) error
Close() error
}

type producer struct {
w *kafka.Writer
w Writer
}

func NewProducer(cfg ProducerConfig) (Producer, error) {
Expand Down
2 changes: 1 addition & 1 deletion producer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ProducerConfig struct {
Writer WriterConfig
}

func (c ProducerConfig) newKafkaWriter() (*kafka.Writer, error) {
func (c ProducerConfig) newKafkaWriter() (Writer, error) {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(c.Writer.Brokers...),
Topic: c.Writer.Topic,
Expand Down
57 changes: 57 additions & 0 deletions producer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package kafka

import (
"context"
"testing"

"github.com/segmentio/kafka-go"
)

func Test_producer_Produce_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
p := producer{w: mw}

// When
err := p.Produce(context.Background(), Message{})
// Then
if err != nil {
t.Fatalf("Producing err %s", err.Error())
}
}

func Test_producer_ProduceBatch_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
p := producer{w: mw}

// When
err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}})
// Then
if err != nil {
t.Fatalf("Batch Producing err %s", err.Error())
}
}

func Test_producer_Close_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
p := producer{w: mw}

// When
err := p.Close()
// Then
if err != nil {
t.Fatalf("Closing err %s", err.Error())
}
}

type mockWriter struct{}

func (m *mockWriter) WriteMessages(ctx context.Context, message ...kafka.Message) error {
return nil
}

func (m *mockWriter) Close() error {
return nil
}

0 comments on commit bb483ec

Please sign in to comment.