Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add functionality pause and resume for consumer #89

Merged
merged 9 commits into from
Jan 15, 2024
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ manager ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer)).
- Added ability for manipulating kafka message headers.
- Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages.
- Enable manuel commit at both single and batch consuming modes.
- Enabling consumer resume/pause functionality. Please refer to [its example](examples/with-pause-resume-consumer) and
[how it works](examples/with-pause-resume-consumer/how-it-works.md) documentation.
- Bumped [kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer/releases) to the latest version:
- Backoff strategy support (linear, exponential options)
- Added message key for retried messages
Expand Down Expand Up @@ -197,6 +199,10 @@ After running `docker-compose up` command, you can run any application you want.

Please refer to [Tracing Example](examples/with-tracing/README.md)

#### With Pause & Resume Consumer

Please refer to [Pause Resume Example](examples/with-pause-resume-consumer)

#### With Grafana & Prometheus

In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can
Expand Down
8 changes: 8 additions & 0 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ type batchConsumer struct {
messageGroupLimit int
}

func (b *batchConsumer) Pause() {
b.base.Pause()
}

func (b *batchConsumer) Resume() {
b.base.Resume()
}

func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
consumerBase, err := newBase(cfg, cfg.BatchConfiguration.MessageGroupLimit*cfg.Concurrency)
if err != nil {
Expand Down
46 changes: 46 additions & 0 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"errors"
"reflect"
"strconv"
Expand Down Expand Up @@ -354,6 +355,51 @@ func Test_batchConsumer_chunk(t *testing.T) {
}
}

func Test_batchConsumer_Pause(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
bc := batchConsumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
context: ctx,
pause: make(chan struct{}),
cancelFn: cancelFn,
},
}

go func() {
<-bc.base.pause
}()

// When
bc.Pause()

// Then
if bc.base.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
}

func Test_batchConsumer_Resume(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
bc := batchConsumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
context: ctx,
cancelFn: cancelFn,
},
}

// When
bc.Resume()

// Then
if bc.base.consumerState != stateRunning {
t.Fatal("consumer state must be in resume!")
}
}

func createMessages(partitionStart int, partitionEnd int) []*Message {
messages := make([]*Message, 0)
for i := partitionStart; i < partitionEnd; i++ {
Expand Down
9 changes: 9 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ type consumer struct {
consumeFn func(*Message) error
}

func (c *consumer) Pause() {
c.base.Pause()
}

func (c *consumer) Resume() {
c.base.Resume()
}

func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
consumerBase, err := newBase(cfg, cfg.Concurrency)
if err != nil {
Expand All @@ -40,6 +48,7 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {

func (c *consumer) Consume() {
go c.subprocesses.Start()

c.wg.Add(1)
go c.startConsume()

Expand Down
56 changes: 54 additions & 2 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ type Consumer interface {
// Consume starts consuming
Consume()

// Pause function pauses consumer, it is stop consuming new messages
Pause()

// Resume function resumes consumer, it is start to working
Resume()

// WithLogger for injecting custom log implementation
WithLogger(logger LoggerInterface)

Expand All @@ -33,6 +39,13 @@ type Reader interface {
CommitMessages(messages []kafka.Message) error
}

type state string

const (
stateRunning state = "running"
statePaused state = "paused"
)

type base struct {
cronsumer kcronsumer.Cronsumer
api API
Expand All @@ -42,6 +55,7 @@ type base struct {
r Reader
cancelFn context.CancelFunc
metric *ConsumerMetric
pause chan struct{}
quit chan struct{}
messageProcessedStream chan struct{}
incomingMessageStream chan *IncomingMessage
Expand All @@ -56,6 +70,7 @@ type base struct {
retryEnabled bool
transactionalRetry bool
distributedTracingEnabled bool
consumerState state
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand All @@ -79,6 +94,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
metric: &ConsumerMetric{},
incomingMessageStream: make(chan *IncomingMessage, messageChSize),
quit: make(chan struct{}),
pause: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
transactionalRetry: *cfg.TransactionalRetry,
Expand All @@ -90,6 +106,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
messageProcessedStream: make(chan struct{}, cfg.Concurrency),
singleConsumingStream: make(chan *Message, cfg.Concurrency),
batchConsumingStream: make(chan []*Message, cfg.Concurrency),
consumerState: stateRunning,
}

if cfg.DistributedTracingEnabled {
Expand Down Expand Up @@ -125,6 +142,9 @@ func (c *base) startConsume() {

for {
select {
case <-c.pause:
c.logger.Debug("startConsume exited!")
return
case <-c.quit:
close(c.incomingMessageStream)
return
Expand Down Expand Up @@ -153,17 +173,49 @@ func (c *base) startConsume() {
}
}

func (c *base) Pause() {
c.logger.Info("Consumer is paused!")

c.cancelFn()

c.pause <- struct{}{}
close(c.pause)

c.consumerState = statePaused
}

func (c *base) Resume() {
c.logger.Info("Consumer is resumed!")

c.pause = make(chan struct{})
c.context, c.cancelFn = context.WithCancel(context.Background())
c.consumerState = stateRunning

c.wg.Add(1)
go c.startConsume()
}

func (c *base) WithLogger(logger LoggerInterface) {
c.logger = logger
}

func (c *base) Stop() error {
c.logger.Debug("Stop called!")
c.logger.Info("Stop called!")

var err error
c.once.Do(func() {
c.subprocesses.Stop()
c.cancelFn()
c.quit <- struct{}{}

// In order to save cpu, we break startConsume loop in pause mode.
// If consumer is pause mode, and Stop called
// We need to close incomingMessageStream, because c.wg.Wait() blocks indefinitely.
if c.consumerState == stateRunning {
c.quit <- struct{}{}
} else if c.consumerState == statePaused {
close(c.incomingMessageStream)
}

c.wg.Wait()
err = c.r.Close()
})
Expand Down
49 changes: 48 additions & 1 deletion consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func Test_base_startConsume(t *testing.T) {
wg: sync.WaitGroup{}, r: &mc,
incomingMessageStream: make(chan *IncomingMessage),
quit: make(chan struct{}),
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
logger: NewZapLogger(LogLevelInfo),
}
b.context, b.cancelFn = context.WithCancel(context.Background())

Expand Down Expand Up @@ -57,6 +58,52 @@ func Test_base_startConsume(t *testing.T) {
})
}

func Test_base_Pause(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
}
go func() {
<-b.pause
}()

// When
b.Pause()

// Then
if b.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
}

func Test_base_Resume(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
quit: make(chan struct{}),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
wg: sync.WaitGroup{},
}

// When
b.Resume()

// Then
if b.consumerState != stateRunning {
t.Fatal("consumer state must be in running")
}
if ctx == b.context {
t.Fatal("contexts must be differ!")
}
}

type mockReader struct {
wantErr bool
}
Expand Down
45 changes: 45 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"context"
"errors"
"testing"
)
Expand Down Expand Up @@ -114,3 +115,47 @@ func Test_consumer_process(t *testing.T) {
}
})
}

func Test_consumer_Pause(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
c := consumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx,
cancelFn: cancelFn,
},
}
go func() {
<-c.base.pause
}()

// When
c.Pause()

// Then
if c.base.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
}

func Test_consumer_Resume(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
c := consumer{
base: &base{
logger: NewZapLogger(LogLevelDebug),
context: ctx,
cancelFn: cancelFn,
},
}

// When
c.Resume()

// Then
if c.base.consumerState != stateRunning {
t.Fatal("consumer state must be in running")
}
}
30 changes: 30 additions & 0 deletions examples/with-pause-resume-consumer/how-it-works.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Before implementation, I researched [segmentio/kafka-go](https://github.com/segmentio/kafka-go)' issues for this
functionality. I came across [this issue](https://github.com/segmentio/kafka-go/issues/474). [Achille-roussel](https://github.com/achille-roussel)
who is the old maintainer of the kafka-go clearly said that

```
To pause consuming from a partition, you can simply stop reading
messages. Kafka does not have a concept of pausing or resuming in its protocol, the responsibility is given to clients
to decide what to read and when.
```

It means, if we stop calling `FetchMessage`, the consumer pauses. If we invoke, the consumer resumes. Here, there is very
important behaviour exist. Consumer group state not affected at all in this situation. When we call `kafka.NewConsumer`,
segmentio/kafka-go library creates a goroutine under the hood, and it starts to send heartbeat with a specific interval
so even if we stop calling `FetchMessage`, consumer group still stable mode and not consumes new message at all.

```go
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()
fmt.Println("Consumer started...!")
```

If you need to implement Pause & Resume functionality on your own applications, you need to call `Consume`. Because this
method creates listeners goroutine under the hood. After that you can manage the lifecycle of the consumer by calling
`Pause` and `Resume` methods.

You can run the example to see `Is consumer consumes new message in Pause mode` or `consumer consumes new message in Resume mode`
by producing dummy messages on kowl ui.

Loading
Loading