Skip to content

Commit

Permalink
feat: add header filter feature (#105)
Browse files Browse the repository at this point in the history
* feat: add header filter feature

* chore: add skipMessageByHeaderFn to the README.md

* chore: fix lint
  • Loading branch information
dilaragorum authored Feb 15, 2024
1 parent 9ec9054 commit 15a97a2
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
|--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|-----------------------------|
| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go#ReaderConfig) | |
| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
| `skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil |
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
| `concurrency` | Number of goroutines used at listeners | 1 |
| `retryEnabled` | Retry/Exception consumer is working or not | false |
Expand Down
10 changes: 10 additions & 0 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type base struct {
context context.Context
r Reader
cancelFn context.CancelFunc
skipMessageByHeaderFn SkipMessageByHeaderFn
metric *ConsumerMetric
pause chan struct{}
quit chan struct{}
Expand Down Expand Up @@ -107,6 +108,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
singleConsumingStream: make(chan *Message, cfg.Concurrency),
batchConsumingStream: make(chan []*Message, cfg.Concurrency),
consumerState: stateRunning,
skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn,
}

if cfg.DistributedTracingEnabled {
Expand Down Expand Up @@ -159,6 +161,14 @@ func (c *base) startConsume() {
continue
}

if c.skipMessageByHeaderFn != nil && c.skipMessageByHeaderFn(m.Headers) {
c.logger.Infof("Message is not processed. Header filter applied. Headers: %v", m.Headers)
if err = c.r.CommitMessages([]kafka.Message{*m}); err != nil {
c.logger.Errorf("Commit Error %s,", err.Error())
}
continue
}

incomingMessage := &IncomingMessage{
kafkaMessage: m,
message: fromKafkaMessage(m),
Expand Down
39 changes: 39 additions & 0 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,45 @@ func Test_base_startConsume(t *testing.T) {
t.Error(diff)
}
})

t.Run("Skip_Incoming_Messages_When_SkipMessageByHeaderFn_Is_Applied", func(t *testing.T) {
// Given
mc := mockReader{}
skipMessageCh := make(chan struct{})
b := base{
wg: sync.WaitGroup{},
r: &mc,
logger: NewZapLogger(LogLevelDebug),
incomingMessageStream: make(chan *IncomingMessage),
skipMessageByHeaderFn: func(header []kafka.Header) bool {
defer func() {
skipMessageCh <- struct{}{}
}()

for _, h := range header {
if h.Key == "header" {
return true
}
}
return false
},
}

b.wg.Add(1)

// When
go b.startConsume()

// Then
<-skipMessageCh

// assert incomingMessageStream does not receive any value because message is skipped
select {
case <-b.incomingMessageStream:
t.Fatal("incoming message stream must equal to 0")
case <-time.After(1 * time.Second):
}
})
}

func Test_base_Pause(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type PreBatchFn func([]*Message) []*Message

type ConsumeFn func(*Message) error

type SkipMessageByHeaderFn func(header []kafka.Header) bool

type DialConfig struct {
Timeout time.Duration
KeepAlive time.Duration
Expand All @@ -36,6 +38,7 @@ type ConsumerConfig struct {
Dial *DialConfig
BatchConfiguration *BatchConfiguration
ConsumeFn ConsumeFn
SkipMessageByHeaderFn SkipMessageByHeaderFn
TransactionalRetry *bool
RetryConfiguration RetryConfiguration
LogLevel LogLevel
Expand Down Expand Up @@ -116,8 +119,6 @@ type DistributedTracingConfiguration struct {
Propagator propagation.TextMapPropagator
}

type SkipMessageByHeaderFn func(headers []Header) bool

func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header {
headers := make([]Header, 0, len(cronsumerHeaders))
for i := range cronsumerHeaders {
Expand Down
47 changes: 47 additions & 0 deletions examples/with-header-filter-consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
"os"
"os/signal"
)

func main() {
consumerCfg := &kafka.ConsumerConfig{
Concurrency: 1,
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
RetryEnabled: false,
SkipMessageByHeaderFn: skipMessageByHeaderFn,
ConsumeFn: consumeFn,
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

fmt.Println("Consumer started...!")

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

func skipMessageByHeaderFn(headers []kafka.Header) bool {
for _, header := range headers {
if header.Key == "SkipMessage" {
return true
}
}
return false
}

func consumeFn(message *kafka.Message) error {
fmt.Printf("Message From %s with value %s\n", message.Topic, string(message.Value))
return nil
}
3 changes: 1 addition & 2 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
github.com/Trendyol/kafka-cronsumer v1.4.6 h1:Hc6afln69+cCAyaYJSQRnjzH5gZ9dpNa/PsBeXiL5GY=
github.com/Trendyol/kafka-cronsumer v1.4.6/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY=
github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
Expand Down
57 changes: 57 additions & 0 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,63 @@ func Test_Should_Batch_Consume_With_PreBatch_Enabled(t *testing.T) {
}
}

func Test_Should_Skip_Message_When_Header_Filter_Given(t *testing.T) {
// Given
topic := "header-filter-topic"
consumerGroup := "header-filter-cg"
brokerAddress := "localhost:9092"

incomingMessage := []segmentio.Message{
{
Topic: topic,
Headers: []segmentio.Header{
{Key: "SkipMessage", Value: []byte("any")},
},
Key: []byte("1"),
Value: []byte(`foo`),
},
}

_, cleanUp := createTopicAndWriteMessages(t, topic, incomingMessage)
defer cleanUp()

consumeCh := make(chan struct{})
skipMessageCh := make(chan struct{})

consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup},
SkipMessageByHeaderFn: func(header []kafka.Header) bool {
defer func() {
skipMessageCh <- struct{}{}
}()
for _, h := range header {
if h.Key == "SkipMessage" {
return true
}
}
return false
},
ConsumeFn: func(message *kafka.Message) error {
consumeCh <- struct{}{}
return nil
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

// Then
<-skipMessageCh

select {
case <-consumeCh:
t.Fatal("Message must be skipped! consumeCh mustn't receive any value")
case <-time.After(1 * time.Second):
}
}

func createTopicAndWriteMessages(t *testing.T, topicName string, messages []segmentio.Message) (*segmentio.Conn, func()) {
t.Helper()

Expand Down

0 comments on commit 15a97a2

Please sign in to comment.