diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 8534f71..ff86c0d 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -1,7 +1,12 @@ version: 2 updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" + open-pull-requests-limit: 10 - package-ecosystem: gomod directory: "/" schedule: interval: daily - open-pull-requests-limit: 10 \ No newline at end of file + open-pull-requests-limit: 10 diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 6f9e429..79e7060 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -11,20 +11,20 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v4 with: go-version: 1.19 - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Start containers run: docker compose -f test/integration/docker-compose.yml up --wait --build --force-recreate --remove-orphans - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Integration Test run: go test -v test/integration/integration_test.go env: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 403a2e8..b4a14fa 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,19 +13,19 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - run: git tag ${{ github.event.inputs.tag }} - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v4 with: go-version: 1.19 - name: Run GoReleaser - uses: goreleaser/goreleaser-action@v4 + uses: goreleaser/goreleaser-action@v5 with: version: latest args: release --rm-dist diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9471879..c9a2b56 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,12 +11,12 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v4 with: go-version: 1.19 diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml index 3f22d18..de84b17 100644 --- a/examples/docker-compose.yml +++ b/examples/docker-compose.yml @@ -50,6 +50,7 @@ services: cub kafka-ready -b kafka:9092 1 20 && \ kafka-topics --create --topic standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ kafka-topics --create --topic retry-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ + kafka-topics --create --topic error-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ sleep infinity'" environment: KAFKA_BROKER_ID: ignored diff --git a/examples/with-deadletter/main.go b/examples/with-deadletter/main.go new file mode 100644 index 0000000..b11c361 --- /dev/null +++ b/examples/with-deadletter/main.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "errors" + "fmt" + "github.com/Trendyol/kafka-konsumer" + "os" + "os/signal" + "time" +) + +const ( + topicName = "standart-topic" + retryTopicName = "retry-topic" + deadLetterTopicName = "error-topic" +) + +func main() { + producer, _ := kafka.NewProducer(kafka.ProducerConfig{ + Writer: kafka.WriterConfig{ + Brokers: []string{"localhost:29092"}, + }, + }) + + _ = producer.Produce(context.Background(), kafka.Message{ + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }) + + consumerCfg := &kafka.ConsumerConfig{ + Concurrency: 1, + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: topicName, + GroupID: "konsumer.group.test", + }, + RetryEnabled: true, + RetryConfiguration: kafka.RetryConfiguration{ + DeadLetterTopic: deadLetterTopicName, + Brokers: []string{"localhost:29092"}, + Topic: retryTopicName, + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 1, + }, + ConsumeFn: consumeFn, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + fmt.Println("Consumer started...!") + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c +} + +func consumeFn(message kafka.Message) error { + fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) + // returns error to be sent to dead-letter topic + return errors.New("consumer error") +}