Skip to content

Commit

Permalink
docs: add deadletter topic example (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
BurakG01 authored Oct 3, 2023
1 parent 624bf79 commit 05ec521
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ services:
cub kafka-ready -b kafka:9092 1 20 && \
kafka-topics --create --topic standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
kafka-topics --create --topic retry-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
kafka-topics --create --topic error-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
sleep infinity'"
environment:
KAFKA_BROKER_ID: ignored
Expand Down
66 changes: 66 additions & 0 deletions examples/with-deadletter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"errors"
"fmt"
"github.com/Trendyol/kafka-konsumer"
"os"
"os/signal"
"time"
)

const (
topicName = "standart-topic"
retryTopicName = "retry-topic"
deadLetterTopicName = "error-topic"
)

func main() {
producer, _ := kafka.NewProducer(kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
})

_ = producer.Produce(context.Background(), kafka.Message{
Topic: topicName,
Key: []byte("1"),
Value: []byte(`{ "foo": "bar" }`),
})

consumerCfg := &kafka.ConsumerConfig{
Concurrency: 1,
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: topicName,
GroupID: "konsumer.group.test",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
DeadLetterTopic: deadLetterTopicName,
Brokers: []string{"localhost:29092"},
Topic: retryTopicName,
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 1,
},
ConsumeFn: consumeFn,
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

fmt.Println("Consumer started...!")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

func consumeFn(message kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
// returns error to be sent to dead-letter topic
return errors.New("consumer error")
}

0 comments on commit 05ec521

Please sign in to comment.