Skip to content

Commit

Permalink
Merge branch 'main' into feature/opentelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Oct 6, 2023
2 parents 99c0242 + 05ec521 commit 34f0558
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 10 deletions.
7 changes: 6 additions & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
open-pull-requests-limit: 10
- package-ecosystem: gomod
directory: "/"
schedule:
interval: daily
open-pull-requests-limit: 10
open-pull-requests-limit: 10
8 changes: 4 additions & 4 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: 1.19

- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Start containers
run: docker compose -f test/integration/docker-compose.yml up --wait --build --force-recreate --remove-orphans

- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Integration Test
run: go test -v test/integration/integration_test.go
env:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 0

- run: git tag ${{ github.event.inputs.tag }}

- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: 1.19

- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v4
uses: goreleaser/goreleaser-action@v5
with:
version: latest
args: release --rm-dist
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: 1.19

Expand Down
1 change: 1 addition & 0 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ services:
cub kafka-ready -b kafka:9092 1 20 && \
kafka-topics --create --topic standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
kafka-topics --create --topic retry-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
kafka-topics --create --topic error-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \
sleep infinity'"
environment:
KAFKA_BROKER_ID: ignored
Expand Down
66 changes: 66 additions & 0 deletions examples/with-deadletter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"errors"
"fmt"
"github.com/Trendyol/kafka-konsumer"
"os"
"os/signal"
"time"
)

const (
topicName = "standart-topic"
retryTopicName = "retry-topic"
deadLetterTopicName = "error-topic"
)

func main() {
producer, _ := kafka.NewProducer(kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
})

_ = producer.Produce(context.Background(), kafka.Message{
Topic: topicName,
Key: []byte("1"),
Value: []byte(`{ "foo": "bar" }`),
})

consumerCfg := &kafka.ConsumerConfig{
Concurrency: 1,
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: topicName,
GroupID: "konsumer.group.test",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
DeadLetterTopic: deadLetterTopicName,
Brokers: []string{"localhost:29092"},
Topic: retryTopicName,
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 1,
},
ConsumeFn: consumeFn,
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

fmt.Println("Consumer started...!")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

func consumeFn(message kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
// returns error to be sent to dead-letter topic
return errors.New("consumer error")
}

0 comments on commit 34f0558

Please sign in to comment.