diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml index fac94a0..3b3ead7 100644 --- a/examples/docker-compose.yml +++ b/examples/docker-compose.yml @@ -41,6 +41,7 @@ services: cub kafka-ready -b kafka:9092 1 20 && \ kafka-topics --create --topic standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ kafka-topics --create --topic retry-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ + kafka-topics --create --topic error-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ sleep infinity'" environment: KAFKA_BROKER_ID: ignored diff --git a/examples/with-deadletter/main.go b/examples/with-deadletter/main.go new file mode 100644 index 0000000..b11c361 --- /dev/null +++ b/examples/with-deadletter/main.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "errors" + "fmt" + "github.com/Trendyol/kafka-konsumer" + "os" + "os/signal" + "time" +) + +const ( + topicName = "standart-topic" + retryTopicName = "retry-topic" + deadLetterTopicName = "error-topic" +) + +func main() { + producer, _ := kafka.NewProducer(kafka.ProducerConfig{ + Writer: kafka.WriterConfig{ + Brokers: []string{"localhost:29092"}, + }, + }) + + _ = producer.Produce(context.Background(), kafka.Message{ + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }) + + consumerCfg := &kafka.ConsumerConfig{ + Concurrency: 1, + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: topicName, + GroupID: "konsumer.group.test", + }, + RetryEnabled: true, + RetryConfiguration: kafka.RetryConfiguration{ + DeadLetterTopic: deadLetterTopicName, + Brokers: []string{"localhost:29092"}, + Topic: retryTopicName, + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 1, + }, + ConsumeFn: consumeFn, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + fmt.Println("Consumer started...!") + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c +} + +func consumeFn(message kafka.Message) error { + fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) + // returns error to be sent to dead-letter topic + return errors.New("consumer error") +}