Skip to content

Commit

Permalink
chore: extract reader interface in order to tests and enable distribu…
Browse files Browse the repository at this point in the history
…ted tracing support
  • Loading branch information
Abdulsametileri committed Sep 17, 2023
1 parent cdd2454 commit 12bf4be
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
19 changes: 17 additions & 2 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type Consumer interface {
Stop() error
}

type Reader interface {
ReadMessage(ctx context.Context) (*kafka.Message, error)
Close() error
}

type base struct {
cronsumer kcronsumer.Cronsumer
api API
Expand All @@ -31,7 +36,7 @@ type base struct {
messageCh chan Message
quit chan struct{}
cancelFn context.CancelFunc
r *kafka.Reader
r Reader
retryTopic string
subprocesses subprocesses
wg sync.WaitGroup
Expand Down Expand Up @@ -109,7 +114,17 @@ func (c *base) startConsume() {
continue
}

c.messageCh <- Message(message)
c.messageCh <- Message{
Topic: message.Topic,
Partition: message.Partition,
Offset: message.Offset,
HighWaterMark: message.HighWaterMark,
Key: message.Key,
Value: message.Value,
Headers: message.Headers,
WriterData: message.WriterData,
Time: message.Time,
}
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,23 @@ func (cfg *ConsumerConfig) newKafkaDialer() (*kafka.Dialer, error) {
return dialer.Dialer, nil
}

func (cfg *ConsumerConfig) newKafkaReader() (*kafka.Reader, error) {
func (cfg *ConsumerConfig) newKafkaReader() (Reader, error) {
cfg.validate()

dialer, err := cfg.newKafkaDialer()
if err != nil {
return nil, err
}

reader := kafka.ReaderConfig(cfg.Reader)
reader.Dialer = dialer
readerCfg := kafka.ReaderConfig(cfg.Reader)
readerCfg.Dialer = dialer
if cfg.Rack != "" {
reader.GroupBalancers = []kafka.GroupBalancer{kafka.RackAffinityGroupBalancer{Rack: cfg.Rack}}
readerCfg.GroupBalancers = []kafka.GroupBalancer{kafka.RackAffinityGroupBalancer{Rack: cfg.Rack}}
}

return kafka.NewReader(reader), nil
reader := kafka.NewReader(readerCfg)

return NewReaderWrapper(reader), nil
}

func (cfg *ConsumerConfig) validate() {
Expand Down
25 changes: 25 additions & 0 deletions reader_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package kafka

import (
"context"

segmentio "github.com/segmentio/kafka-go"
)

type readerWrapper struct {
r *segmentio.Reader
}

func NewReaderWrapper(reader *segmentio.Reader) Reader {
return &readerWrapper{r: reader}
}

// ReadMessage returns pointer of kafka message because we will support distributed tracing in the near future
func (s *readerWrapper) ReadMessage(ctx context.Context) (*segmentio.Message, error) {
message, err := s.r.ReadMessage(ctx)
return &message, err
}

func (s *readerWrapper) Close() error {
return s.r.Close()
}

0 comments on commit 12bf4be

Please sign in to comment.