Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add functionality pause and resume for consumer #89

Merged
merged 9 commits into from
Jan 15, 2024
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ manager ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer)).
- Added ability for manipulating kafka message headers.
- Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages.
- Enable manuel commit at both single and batch consuming modes.
- Enabling consumer resume/pause functionality. Please refer to [its example](examples/with-pause-resume-consumer) and
[how it works](examples/with-pause-resume-consumer/how-it-works.md) documentation.
- Bumped [kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer/releases) to the latest version:
- Backoff strategy support (linear, exponential options)
- Added message key for retried messages
Expand Down Expand Up @@ -197,6 +199,10 @@ After running `docker-compose up` command, you can run any application you want.

Please refer to [Tracing Example](examples/with-tracing/README.md)

#### With Pause & Resume Consumer

Please refer to [Pause Resume Example](examples/with-pause-resume-consumer)

#### With Grafana & Prometheus

In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can
Expand Down
8 changes: 8 additions & 0 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ type batchConsumer struct {
messageGroupLimit int
}

func (b *batchConsumer) Pause() {
b.base.Pause()
}

func (b *batchConsumer) Resume() {
b.base.Resume()
}

func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
consumerBase, err := newBase(cfg, cfg.BatchConfiguration.MessageGroupLimit*cfg.Concurrency)
if err != nil {
Expand Down
50 changes: 50 additions & 0 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"errors"
"reflect"
"strconv"
Expand Down Expand Up @@ -354,6 +355,55 @@ func Test_batchConsumer_chunk(t *testing.T) {
}
}

func Test_batchConsumer_Pause(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
bc := batchConsumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
},
}

go func() {
<-bc.base.pause
}()

// When
bc.Pause()

// Then
if bc.base.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
}

func Test_batchConsumer_Resume(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
bc := batchConsumer{
base: &base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
},
}

// When
bc.Resume()

// Then
if bc.base.consumerState != stateRunning {
t.Fatal("consumer state must be in resume!")
}
}

func createMessages(partitionStart int, partitionEnd int) []*Message {
messages := make([]*Message, 0)
for i := partitionStart; i < partitionEnd; i++ {
Expand Down
9 changes: 9 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ type consumer struct {
consumeFn func(*Message) error
}

func (c *consumer) Pause() {
c.base.Pause()
}

func (c *consumer) Resume() {
c.base.Resume()
}

func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
consumerBase, err := newBase(cfg, cfg.Concurrency)
if err != nil {
Expand All @@ -40,6 +48,7 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {

func (c *consumer) Consume() {
go c.subprocesses.Start()

c.wg.Add(1)
go c.startConsume()

Expand Down
55 changes: 53 additions & 2 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
// Consume starts consuming
Consume()

// Pause function pauses consumer, it is stop consuming new messages
Pause()

// Resume function resumes consumer, it is start to working
Resume()

// WithLogger for injecting custom log implementation
WithLogger(logger LoggerInterface)

Expand All @@ -33,6 +39,13 @@
CommitMessages(messages []kafka.Message) error
}

type state string

const (
stateRunning state = "running"
statePaused state = "paused"
)

type base struct {
cronsumer kcronsumer.Cronsumer
api API
Expand All @@ -42,6 +55,7 @@
r Reader
cancelFn context.CancelFunc
metric *ConsumerMetric
pause chan struct{}
quit chan struct{}
messageProcessedStream chan struct{}
incomingMessageStream chan *IncomingMessage
Expand All @@ -56,6 +70,7 @@
retryEnabled bool
transactionalRetry bool
distributedTracingEnabled bool
consumerState state
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand All @@ -79,6 +94,7 @@
metric: &ConsumerMetric{},
incomingMessageStream: make(chan *IncomingMessage, messageChSize),
quit: make(chan struct{}),
pause: make(chan struct{}),

Check warning on line 97 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L97

Added line #L97 was not covered by tests
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
transactionalRetry: *cfg.TransactionalRetry,
Expand All @@ -90,6 +106,7 @@
messageProcessedStream: make(chan struct{}, cfg.Concurrency),
singleConsumingStream: make(chan *Message, cfg.Concurrency),
batchConsumingStream: make(chan []*Message, cfg.Concurrency),
consumerState: stateRunning,

Check warning on line 109 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L109

Added line #L109 was not covered by tests
}

if cfg.DistributedTracingEnabled {
Expand Down Expand Up @@ -125,6 +142,9 @@

for {
select {
case <-c.pause:
c.logger.Debug("startConsume exited!")
return

Check warning on line 147 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L145-L147

Added lines #L145 - L147 were not covered by tests
case <-c.quit:
close(c.incomingMessageStream)
return
Expand Down Expand Up @@ -153,17 +173,48 @@
}
}

func (c *base) Pause() {
c.logger.Info("Consumer is paused!")

c.cancelFn()

c.pause <- struct{}{}

c.consumerState = statePaused
}

func (c *base) Resume() {
c.logger.Info("Consumer is resumed!")

c.pause = make(chan struct{})
c.context, c.cancelFn = context.WithCancel(context.Background())
c.consumerState = stateRunning

c.wg.Add(1)
go c.startConsume()
}

func (c *base) WithLogger(logger LoggerInterface) {
c.logger = logger
}

func (c *base) Stop() error {
c.logger.Debug("Stop called!")
c.logger.Info("Stop is called!")

Check warning on line 202 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L202

Added line #L202 was not covered by tests

var err error
c.once.Do(func() {
c.subprocesses.Stop()
c.cancelFn()
c.quit <- struct{}{}

// In order to save cpu, we break startConsume loop in pause mode.
// If consumer is pause mode and Stop is called
// We need to close incomingMessageStream, because c.wg.Wait() blocks indefinitely.
if c.consumerState == stateRunning {
c.quit <- struct{}{}
} else if c.consumerState == statePaused {
close(c.incomingMessageStream)

Check warning on line 215 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L212-L215

Added lines #L212 - L215 were not covered by tests
}

c.wg.Wait()
err = c.r.Close()
})
Expand Down
55 changes: 52 additions & 3 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"time"

"github.com/google/go-cmp/cmp"

"github.com/segmentio/kafka-go"
)

func Test_base_startConsume(t *testing.T) {
t.Run("Return_When_Quit_Signal_Is_Came", func(t *testing.T) {
mc := mockReader{wantErr: true}
b := base{
wg: sync.WaitGroup{}, r: &mc,
wg: sync.WaitGroup{},
r: &mc,
incomingMessageStream: make(chan *IncomingMessage),
quit: make(chan struct{}),
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
logger: NewZapLogger(LogLevelError),
consumerState: stateRunning,
}
b.context, b.cancelFn = context.WithCancel(context.Background())

Expand Down Expand Up @@ -57,6 +59,53 @@ func Test_base_startConsume(t *testing.T) {
})
}

func Test_base_Pause(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
}
go func() {
<-b.pause
}()

// When
b.Pause()

// Then
if b.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
}

func Test_base_Resume(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
}

// When
b.Resume()

// Then
if b.consumerState != stateRunning {
t.Fatal("consumer state must be in running")
}
if ctx == b.context {
t.Fatal("contexts must be differ!")
}
}

type mockReader struct {
wantErr bool
}
Expand Down
50 changes: 50 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package kafka

import (
"context"
"errors"
"sync"
"testing"
)

Expand Down Expand Up @@ -114,3 +116,51 @@ func Test_consumer_process(t *testing.T) {
}
})
}

func Test_consumer_Pause(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
c := consumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
},
}
go func() {
<-c.base.pause
}()

// When
c.Pause()

// Then
if c.base.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
}

func Test_consumer_Resume(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
c := consumer{
base: &base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
},
}

// When
c.Resume()

// Then
if c.base.consumerState != stateRunning {
t.Fatal("consumer state must be in running")
}
}
Loading
Loading