Skip to content

Commit

Permalink
feat: change consumeFn and batchConsumeFn as pointer signature (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
A.Samet İleri authored Nov 9, 2023
1 parent 326fc77 commit 38660af
Show file tree
Hide file tree
Showing 22 changed files with 268 additions and 124 deletions.
8 changes: 4 additions & 4 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type batchConsumer struct {
*base

consumeFn func([]Message) error
consumeFn func([]*Message) error

messageGroupLimit int
messageGroupDuration time.Duration
Expand All @@ -30,7 +30,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {

if cfg.RetryEnabled {
c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error {
return c.consumeFn([]Message{toMessage(message)})
return c.consumeFn([]*Message{toMessage(message)})
})
}

Expand Down Expand Up @@ -62,7 +62,7 @@ func (b *batchConsumer) startBatch() {
ticker := time.NewTicker(b.messageGroupDuration)
defer ticker.Stop()

messages := make([]Message, 0, b.messageGroupLimit)
messages := make([]*Message, 0, b.messageGroupLimit)

for {
select {
Expand All @@ -88,7 +88,7 @@ func (b *batchConsumer) startBatch() {
}
}

func (b *batchConsumer) process(messages []Message) {
func (b *batchConsumer) process(messages []*Message) {
consumeErr := b.consumeFn(messages)

if consumeErr != nil {
Expand Down
32 changes: 16 additions & 16 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ func Test_batchConsumer_startBatch(t *testing.T) {

bc := batchConsumer{
base: &base{
messageCh: make(chan Message),
messageCh: make(chan *Message),
metric: &ConsumerMetric{},
wg: sync.WaitGroup{},
},
messageGroupLimit: 3,
messageGroupDuration: 500 * time.Millisecond,
consumeFn: func(messages []Message) error {
consumeFn: func(messages []*Message) error {
numberOfBatch++
return nil
},
}
go func() {
// Simulate messageGroupLimit
bc.base.messageCh <- Message{}
bc.base.messageCh <- Message{}
bc.base.messageCh <- Message{}
bc.base.messageCh <- &Message{}
bc.base.messageCh <- &Message{}
bc.base.messageCh <- &Message{}

time.Sleep(1 * time.Second)

// Simulate messageGroupDuration
bc.base.messageCh <- Message{}
bc.base.messageCh <- &Message{}

time.Sleep(1 * time.Second)

Expand All @@ -64,13 +64,13 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}},
consumeFn: func([]Message) error {
consumeFn: func([]*Message) error {
return nil
},
}

// When
bc.process([]Message{{}, {}, {}})
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 3 {
Expand All @@ -85,7 +85,7 @@ func Test_batchConsumer_process(t *testing.T) {
gotOnlyOneTimeException := true
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []Message) error {
consumeFn: func(messages []*Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
return errors.New("simulate only one time exception")
Expand All @@ -95,7 +95,7 @@ func Test_batchConsumer_process(t *testing.T) {
}

// When
bc.process([]Message{{}, {}, {}})
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 3 {
Expand All @@ -109,13 +109,13 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []Message) error {
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

// When
bc.process([]Message{{}, {}, {}})
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
Expand All @@ -130,13 +130,13 @@ func Test_batchConsumer_process(t *testing.T) {
mc := mockCronsumer{}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(messages []Message) error {
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

// When
bc.process([]Message{{}, {}, {}})
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
Expand All @@ -151,13 +151,13 @@ func Test_batchConsumer_process(t *testing.T) {
mc := mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(messages []Message) error {
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

// When
bc.process([]Message{{}, {}, {}})
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
Expand Down
4 changes: 2 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
type consumer struct {
*base

consumeFn func(Message) error
consumeFn func(*Message) error
}

func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand Down Expand Up @@ -51,7 +51,7 @@ func (c *consumer) Consume() {
}
}

func (c *consumer) process(message Message) {
func (c *consumer) process(message *Message) {
consumeErr := c.consumeFn(message)

if consumeErr != nil {
Expand Down
4 changes: 2 additions & 2 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type base struct {
logger LoggerInterface
metric *ConsumerMetric
context context.Context
messageCh chan Message
messageCh chan *Message
quit chan struct{}
cancelFn context.CancelFunc
r Reader
Expand Down Expand Up @@ -69,7 +69,7 @@ func newBase(cfg *ConsumerConfig) (*base, error) {

c := base{
metric: &ConsumerMetric{},
messageCh: make(chan Message, cfg.Concurrency),
messageCh: make(chan *Message, cfg.Concurrency),
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
Expand Down
7 changes: 4 additions & 3 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ func Test_base_startConsume(t *testing.T) {
mc := mockReader{wantErr: true}
b := base{
wg: sync.WaitGroup{}, r: &mc,
messageCh: make(chan Message), quit: make(chan struct{}),
logger: NewZapLogger(LogLevelDebug),
messageCh: make(chan *Message),
quit: make(chan struct{}),
logger: NewZapLogger(LogLevelDebug),
}
b.context, b.cancelFn = context.WithCancel(context.Background())

Expand All @@ -33,7 +34,7 @@ func Test_base_startConsume(t *testing.T) {
t.Run("Read_Incoming_Messages_Successfully", func(t *testing.T) {
// Given
mc := mockReader{}
b := base{wg: sync.WaitGroup{}, r: &mc, messageCh: make(chan Message)}
b := base{wg: sync.WaitGroup{}, r: &mc, messageCh: make(chan *Message)}
b.wg.Add(1)

// When
Expand Down
4 changes: 2 additions & 2 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (

type ReaderConfig kafka.ReaderConfig

type BatchConsumeFn func([]Message) error
type BatchConsumeFn func([]*Message) error

type ConsumeFn func(Message) error
type ConsumeFn func(*Message) error

type DialConfig struct {
Timeout time.Duration
Expand Down
20 changes: 10 additions & 10 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ func Test_consumer_process(t *testing.T) {
// Given
c := consumer{
base: &base{metric: &ConsumerMetric{}},
consumeFn: func(Message) error {
consumeFn: func(*Message) error {
return nil
},
}

// When
c.process(Message{})
c.process(&Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 1 {
Expand All @@ -31,7 +31,7 @@ func Test_consumer_process(t *testing.T) {
gotOnlyOneTimeException := true
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(Message) error {
consumeFn: func(*Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
return errors.New("simulate only one time exception")
Expand All @@ -41,7 +41,7 @@ func Test_consumer_process(t *testing.T) {
}

// When
c.process(Message{})
c.process(&Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 1 {
Expand All @@ -55,13 +55,13 @@ func Test_consumer_process(t *testing.T) {
// Given
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(Message) error {
consumeFn: func(*Message) error {
return errors.New("error case")
},
}

// When
c.process(Message{})
c.process(&Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
Expand All @@ -76,13 +76,13 @@ func Test_consumer_process(t *testing.T) {
mc := mockCronsumer{}
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(Message) error {
consumeFn: func(*Message) error {
return errors.New("error case")
},
}

// When
c.process(Message{})
c.process(&Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
Expand All @@ -97,13 +97,13 @@ func Test_consumer_process(t *testing.T) {
mc := mockCronsumer{wantErr: true}
c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(Message) error {
consumeFn: func(*Message) error {
return errors.New("error case")
},
}

// When
c.process(Message{})
c.process(&Message{})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
Expand Down
2 changes: 1 addition & 1 deletion examples/with-deadletter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func main() {
<-c
}

func consumeFn(message kafka.Message) error {
func consumeFn(message *kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
// returns error to be sent to dead-letter topic
return errors.New("consumer error")
Expand Down
2 changes: 1 addition & 1 deletion examples/with-grafana/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func main() {
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
ConsumeFn: func(message kafka.Message) error {
ConsumeFn: func(message *kafka.Message) error {
// mocking some background task
time.Sleep(1 * time.Second)

Expand Down
10 changes: 9 additions & 1 deletion examples/with-kafka-batch-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ func main() {
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
Expand All @@ -35,7 +43,7 @@ func main() {

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/with-kafka-batch-consumer/load.txt
func batchConsumeFn(messages []kafka.Message) error {
func batchConsumeFn(messages []*kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
}
2 changes: 1 addition & 1 deletion examples/with-kafka-cronsumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
<-c
}

func consumeFn(message kafka.Message) error {
func consumeFn(message *kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
2 changes: 1 addition & 1 deletion examples/with-prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func main() {
<-c
}

func consumeFn(message kafka.Message) error {
func consumeFn(message *kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
Loading

0 comments on commit 38660af

Please sign in to comment.