From bb483ec584c0248ce62e3178bab96fdadc08e921 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Sun, 17 Sep 2023 14:42:05 +0300 Subject: [PATCH] test: add producer tests --- producer.go | 7 +++++- producer_config.go | 2 +- producer_test.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 producer_test.go diff --git a/producer.go b/producer.go index 13e1b35..3dea75d 100644 --- a/producer.go +++ b/producer.go @@ -12,8 +12,13 @@ type Producer interface { Close() error } +type Writer interface { + WriteMessages(context.Context, ...kafka.Message) error + Close() error +} + type producer struct { - w *kafka.Writer + w Writer } func NewProducer(cfg ProducerConfig) (Producer, error) { diff --git a/producer_config.go b/producer_config.go index 98c5d7c..b8dbf2e 100644 --- a/producer_config.go +++ b/producer_config.go @@ -42,7 +42,7 @@ type ProducerConfig struct { Writer WriterConfig } -func (c ProducerConfig) newKafkaWriter() (*kafka.Writer, error) { +func (c ProducerConfig) newKafkaWriter() (Writer, error) { kafkaWriter := &kafka.Writer{ Addr: kafka.TCP(c.Writer.Brokers...), Topic: c.Writer.Topic, diff --git a/producer_test.go b/producer_test.go new file mode 100644 index 0000000..26c1eaa --- /dev/null +++ b/producer_test.go @@ -0,0 +1,57 @@ +package kafka + +import ( + "context" + "testing" + + "github.com/segmentio/kafka-go" +) + +func Test_producer_Produce_Successfully(t *testing.T) { + // Given + mw := &mockWriter{} + p := producer{w: mw} + + // When + err := p.Produce(context.Background(), Message{}) + // Then + if err != nil { + t.Fatalf("Producing err %s", err.Error()) + } +} + +func Test_producer_ProduceBatch_Successfully(t *testing.T) { + // Given + mw := &mockWriter{} + p := producer{w: mw} + + // When + err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}}) + // Then + if err != nil { + t.Fatalf("Batch Producing err %s", err.Error()) + } +} + +func Test_producer_Close_Successfully(t *testing.T) { + // Given + mw := &mockWriter{} + p := producer{w: mw} + + // When + err := p.Close() + // Then + if err != nil { + t.Fatalf("Closing err %s", err.Error()) + } +} + +type mockWriter struct{} + +func (m *mockWriter) WriteMessages(ctx context.Context, message ...kafka.Message) error { + return nil +} + +func (m *mockWriter) Close() error { + return nil +}