diff --git a/README.md b/README.md index 1c71c86..f08eab8 100644 --- a/README.md +++ b/README.md @@ -196,6 +196,10 @@ After running `docker-compose up` command, you can run any application you want. +#### With Producer Interceptor + +Please refer to [Producer Interceptor Example](examples/with-kafka-producer-interceptor) + #### With Distributed Tracing Support Please refer to [Tracing Example](examples/with-tracing/README.md) diff --git a/examples/with-kafka-producer-interceptor/main.go b/examples/with-kafka-producer-interceptor/main.go new file mode 100644 index 0000000..9005712 --- /dev/null +++ b/examples/with-kafka-producer-interceptor/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "fmt" + "github.com/Trendyol/kafka-konsumer/v2" +) + +func main() { + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{ + Brokers: []string{"localhost:29092"}, + }, + }, newProducerInterceptor()...) + + const topicName = "standart-topic" + + _ = producer.Produce(context.Background(), kafka.Message{ + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }) + + _ = producer.ProduceBatch(context.Background(), []kafka.Message{ + { + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }, + { + Topic: topicName, + Key: []byte("2"), + Value: []byte(`{ "foo2": "bar2" }`), + }, + }) + + fmt.Println("Messages sended...!") +} diff --git a/examples/with-kafka-producer-interceptor/producer-interceptor.go b/examples/with-kafka-producer-interceptor/producer-interceptor.go new file mode 100644 index 0000000..c8a9d1c --- /dev/null +++ b/examples/with-kafka-producer-interceptor/producer-interceptor.go @@ -0,0 +1,16 @@ +package main + +import "github.com/Trendyol/kafka-konsumer/v2" + +type producerInterceptor struct{} + +func (i *producerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: "x-source-app", + Value: []byte("kafka-konsumer"), + }) +} + +func newProducerInterceptor() []kafka.ProducerInterceptor { + return []kafka.ProducerInterceptor{&producerInterceptor{}} +} diff --git a/producer.go b/producer.go index ee2910d..640b919 100644 --- a/producer.go +++ b/producer.go @@ -18,10 +18,11 @@ type Writer interface { } type producer struct { - w Writer + w Writer + interceptors []ProducerInterceptor } -func NewProducer(cfg *ProducerConfig) (Producer, error) { +func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) { kafkaWriter := &kafka.Writer{ Addr: kafka.TCP(cfg.Writer.Brokers...), Topic: cfg.Writer.Topic, @@ -51,7 +52,7 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) { kafkaWriter.Transport = transport } - p := &producer{w: kafkaWriter} + p := &producer{w: kafkaWriter, interceptors: interceptors} if cfg.DistributedTracingEnabled { otelWriter, err := NewOtelProducer(cfg, kafkaWriter) @@ -64,18 +65,33 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) { return p, nil } -func (c *producer) Produce(ctx context.Context, message Message) error { - return c.w.WriteMessages(ctx, message.toKafkaMessage()) +func (p *producer) Produce(ctx context.Context, message Message) error { + if len(p.interceptors) > 0 { + p.executeInterceptors(ctx, &message) + } + + return p.w.WriteMessages(ctx, message.toKafkaMessage()) } -func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error { +func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error { kafkaMessages := make([]kafka.Message, 0, len(messages)) for i := range messages { + if len(p.interceptors) > 0 { + p.executeInterceptors(ctx, &messages[i]) + } + kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage()) } - return c.w.WriteMessages(ctx, kafkaMessages...) + + return p.w.WriteMessages(ctx, kafkaMessages...) +} + +func (p *producer) executeInterceptors(ctx context.Context, message *Message) { + for _, interceptor := range p.interceptors { + interceptor.OnProduce(ProducerInterceptorContext{Context: ctx, Message: message}) + } } -func (c *producer) Close() error { - return c.w.Close() +func (p *producer) Close() error { + return p.w.Close() } diff --git a/producer_interceptor.go b/producer_interceptor.go new file mode 100644 index 0000000..b5bc6b9 --- /dev/null +++ b/producer_interceptor.go @@ -0,0 +1,14 @@ +package kafka + +import ( + "context" +) + +type ProducerInterceptorContext struct { + Context context.Context + Message *Message +} + +type ProducerInterceptor interface { + OnProduce(ctx ProducerInterceptorContext) +} diff --git a/producer_test.go b/producer_test.go index d4fd8ac..452b6df 100644 --- a/producer_test.go +++ b/producer_test.go @@ -4,6 +4,8 @@ import ( "context" "testing" + "github.com/gofiber/fiber/v2/utils" + "github.com/segmentio/kafka-go" ) @@ -20,6 +22,26 @@ func Test_producer_Produce_Successfully(t *testing.T) { } } +func Test_producer_Produce_interceptor_Successfully(t *testing.T) { + // Given + mw := &mockWriter{} + msg := Message{Headers: make([]Header, 0)} + msg.Headers = append(msg.Headers, kafka.Header{ + Key: "x-correlation-id", + Value: []byte(utils.UUIDv4()), + }) + interceptor := newMockProducerInterceptor() + + p := producer{w: mw, interceptors: interceptor} + + // When + err := p.Produce(context.Background(), msg) + // Then + if err != nil { + t.Fatalf("Producing err %s", err.Error()) + } +} + func Test_producer_ProduceBatch_Successfully(t *testing.T) { // Given mw := &mockWriter{} @@ -33,6 +55,20 @@ func Test_producer_ProduceBatch_Successfully(t *testing.T) { } } +func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) { + // Given + mw := &mockWriter{} + interceptor := newMockProducerInterceptor() + p := producer{w: mw, interceptors: interceptor} + + // When + err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}}) + // Then + if err != nil { + t.Fatalf("Batch Producing err %s", err.Error()) + } +} + func Test_producer_Close_Successfully(t *testing.T) { // Given mw := &mockWriter{} @@ -48,10 +84,23 @@ func Test_producer_Close_Successfully(t *testing.T) { type mockWriter struct{} -func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error { +func (m *mockWriter) WriteMessages(_ context.Context, msg ...kafka.Message) error { return nil } func (m *mockWriter) Close() error { return nil } + +type mockProducerInterceptor struct{} + +func (i *mockProducerInterceptor) OnProduce(ctx ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: "test", + Value: []byte("test"), + }) +} + +func newMockProducerInterceptor() []ProducerInterceptor { + return []ProducerInterceptor{&mockProducerInterceptor{}} +} diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index af93446..a11a4d0 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -14,28 +14,92 @@ import ( func Test_Should_Produce_Successfully(t *testing.T) { // Given t.Parallel() - topic := "produce-topic" brokerAddress := "localhost:9092" - producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, - Transport: &kafka.TransportConfig{ - MetadataTopics: []string{ - topic, + t.Run("without interceptor", func(t *testing.T) { + //Given + + topic := "produce-topic" + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, + Transport: &kafka.TransportConfig{ + MetadataTopics: []string{ + topic, + }, }, - }, - }) + }) - // When - err := producer.Produce(context.Background(), kafka.Message{ - Key: []byte("1"), - Value: []byte(`foo`), + // When + err := producer.Produce(context.Background(), kafka.Message{ + Key: []byte("1"), + Value: []byte(`foo`), + }) + + // Then + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } }) - // Then - if err != nil { - t.Fatalf("Error while producing err %s", err.Error()) - } + t.Run("with interceptor", func(t *testing.T) { + // Given + topic := "produce-interceptor-topic" + consumerGroup := "produce-topic-cg" + interceptor := newMockProducerInterceptor() + + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, + Transport: &kafka.TransportConfig{ + MetadataTopics: []string{ + topic, + }, + }, + }, interceptor...) + + // When + err := producer.Produce(context.Background(), kafka.Message{ + Key: []byte("1"), + Value: []byte(`foo`), + }) + + messageCh := make(chan *kafka.Message) + + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, + ConsumeFn: func(message *kafka.Message) error { + messageCh <- message + return nil + }, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + // Then + + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } + + actual := <-messageCh + if string(actual.Value) != "foo" { + t.Fatalf("Value does not equal %s", actual.Value) + } + if string(actual.Key) != "1" { + t.Fatalf("Key does not equal %s", actual.Key) + } + if len(actual.Headers) != 1 { + t.Fatalf("Header size does not equal %d", len(actual.Headers)) + } + if string(actual.Headers[0].Key) != xSourceAppKey { + t.Fatalf("Header key does not equal %s", actual.Headers[0].Key) + } + if string(actual.Headers[0].Value) != xSourceAppValue { + t.Fatalf("Header value does not equal %s", actual.Headers[0].Value) + } + }) } func Test_Should_Batch_Produce_Successfully(t *testing.T) { @@ -43,11 +107,6 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { t.Parallel() topic := "batch-produce-topic" brokerAddress := "localhost:9092" - - producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}) - - // When msgs := []kafka.Message{ { Key: []byte("1"), @@ -59,13 +118,33 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { }, } - // When - err := producer.ProduceBatch(context.Background(), msgs) + t.Run("without interceptor", func(t *testing.T) { + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}) - // Then - if err != nil { - t.Fatalf("Error while producing err %s", err.Error()) - } + // When + err := producer.ProduceBatch(context.Background(), msgs) + + // Then + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } + }) + + t.Run("with interceptor", func(t *testing.T) { + interceptors := newMockProducerInterceptor() + + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, interceptors...) + + // When + err := producer.ProduceBatch(context.Background(), msgs) + + // Then + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } + }) } func Test_Should_Consume_Message_Successfully(t *testing.T) { @@ -563,3 +642,21 @@ func assertEventually(t *testing.T, condition func() bool, waitFor time.Duration } } } + +type mockProducerInterceptor struct{} + +const ( + xSourceAppKey = "x-source-app" + xSourceAppValue = "kafka-konsumer" +) + +func (i *mockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: xSourceAppKey, + Value: []byte(xSourceAppValue), + }) +} + +func newMockProducerInterceptor() []kafka.ProducerInterceptor { + return []kafka.ProducerInterceptor{&mockProducerInterceptor{}} +}