Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement distributed tracing on consuming and producing #39

Merged
merged 13 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .github/images/jaeger-dashboard-example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .github/images/tracing.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.idea
dist/
dist/
unit_coverage.out
unit_coverage.html
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,13 @@ After running `docker-compose up` command, you can run any application you want.
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
}

</details>


#### With Distributed Tracing Support

Please refer to [Tracing Example](examples/with-tracing/README.md)

#### With Grafana & Prometheus

In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can
Expand Down
22 changes: 22 additions & 0 deletions api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package kafka

import "testing"

func Test_setDefaults(t *testing.T) {
// Given
cfg := ConsumerConfig{}

// When
setDefaults(&cfg)

// Then
if *cfg.APIConfiguration.Port != 8090 {
t.Fatal("Default API Port is 8090")
}
if *cfg.APIConfiguration.HealthCheckPath != "/healthcheck" {
t.Fatal("Default Healtcheck path is /healthcheck")
}
if *cfg.MetricConfiguration.Path != "/metrics" {
t.Fatal("Default Healtcheck path is /metrics")
}
}
71 changes: 38 additions & 33 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"sync"

"github.com/Trendyol/otel-kafka-konsumer"
"go.opentelemetry.io/otel/propagation"

"github.com/prometheus/client_golang/prometheus"

cronsumer "github.com/Trendyol/kafka-cronsumer"
Expand All @@ -28,21 +31,23 @@ type Reader interface {
}

type base struct {
cronsumer kcronsumer.Cronsumer
api API
logger LoggerInterface
metric *ConsumerMetric
context context.Context
messageCh chan Message
quit chan struct{}
cancelFn context.CancelFunc
r Reader
retryTopic string
subprocesses subprocesses
wg sync.WaitGroup
concurrency int
once sync.Once
retryEnabled bool
cronsumer kcronsumer.Cronsumer
api API
logger LoggerInterface
metric *ConsumerMetric
context context.Context
messageCh chan Message
quit chan struct{}
cancelFn context.CancelFunc
r Reader
retryTopic string
subprocesses subprocesses
wg sync.WaitGroup
concurrency int
once sync.Once
retryEnabled bool
distributedTracingEnabled bool
propagator propagation.TextMapPropagator
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand All @@ -63,14 +68,19 @@ func newBase(cfg *ConsumerConfig) (*base, error) {
}

c := base{
metric: &ConsumerMetric{},
messageCh: make(chan Message, cfg.Concurrency),
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
logger: log,
subprocesses: newSubProcesses(),
r: reader,
metric: &ConsumerMetric{},
messageCh: make(chan Message, cfg.Concurrency),
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
distributedTracingEnabled: cfg.DistributedTracingEnabled,
logger: log,
subprocesses: newSubProcesses(),
r: reader,
}

if cfg.DistributedTracingEnabled {
c.propagator = cfg.DistributedTracingConfiguration.Propagator
}

c.context, c.cancelFn = context.WithCancel(context.Background())
Expand Down Expand Up @@ -114,17 +124,12 @@ func (c *base) startConsume() {
continue
}

c.messageCh <- Message{
Topic: message.Topic,
Partition: message.Partition,
Offset: message.Offset,
HighWaterMark: message.HighWaterMark,
Key: message.Key,
Value: message.Value,
Headers: message.Headers,
WriterData: message.WriterData,
Time: message.Time,
consumedMessage := fromKafkaMessage(message)
if c.distributedTracingEnabled {
consumedMessage.Context = c.propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(message))
}

c.messageCh <- consumedMessage
}
}
}
Expand Down
62 changes: 43 additions & 19 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package kafka
import (
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
lcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/logger"

Expand All @@ -21,23 +25,25 @@ type DialConfig struct {
}

type ConsumerConfig struct {
APIConfiguration APIConfiguration
Logger LoggerInterface
MetricConfiguration MetricConfiguration
SASL *SASLConfig
TLS *TLSConfig
Dial *DialConfig
BatchConfiguration *BatchConfiguration
ConsumeFn ConsumeFn
ClientID string
Rack string
LogLevel LogLevel
Reader ReaderConfig
RetryConfiguration RetryConfiguration
CommitInterval time.Duration
Concurrency int
RetryEnabled bool
APIEnabled bool
APIConfiguration APIConfiguration
Logger LoggerInterface
MetricConfiguration MetricConfiguration
SASL *SASLConfig
TLS *TLSConfig
Dial *DialConfig
BatchConfiguration *BatchConfiguration
ConsumeFn ConsumeFn
ClientID string
Rack string
LogLevel LogLevel
Reader ReaderConfig
RetryConfiguration RetryConfiguration
CommitInterval time.Duration
DistributedTracingEnabled bool
DistributedTracingConfiguration DistributedTracingConfiguration
Concurrency int
RetryEnabled bool
APIEnabled bool
}

func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
Expand Down Expand Up @@ -95,6 +101,11 @@ type MetricConfiguration struct {
Path *string
}

type DistributedTracingConfiguration struct {
TracerProvider trace.TracerProvider
Propagator propagation.TextMapPropagator
}

type RetryConfiguration struct {
SASL *SASLConfig
TLS *TLSConfig
Expand Down Expand Up @@ -139,7 +150,7 @@ func (cfg *ConsumerConfig) newKafkaDialer() (*kafka.Dialer, error) {
}

func (cfg *ConsumerConfig) newKafkaReader() (Reader, error) {
cfg.validate()
cfg.setDefaults()

dialer, err := cfg.newKafkaDialer()
if err != nil {
Expand All @@ -154,10 +165,14 @@ func (cfg *ConsumerConfig) newKafkaReader() (Reader, error) {

reader := kafka.NewReader(readerCfg)

if cfg.DistributedTracingEnabled {
return NewOtelReaderWrapper(cfg, reader)
}

return NewReaderWrapper(reader), nil
}

func (cfg *ConsumerConfig) validate() {
func (cfg *ConsumerConfig) setDefaults() {
if cfg.Concurrency == 0 {
cfg.Concurrency = 1
}
Expand All @@ -169,4 +184,13 @@ func (cfg *ConsumerConfig) validate() {
} else {
cfg.Reader.CommitInterval = cfg.CommitInterval
}

if cfg.DistributedTracingEnabled {
if cfg.DistributedTracingConfiguration.Propagator == nil {
cfg.DistributedTracingConfiguration.Propagator = otel.GetTextMapPropagator()
}
if cfg.DistributedTracingConfiguration.TracerProvider == nil {
cfg.DistributedTracingConfiguration.TracerProvider = otel.GetTracerProvider()
}
}
}
19 changes: 17 additions & 2 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestConsumerConfig_validate(t *testing.T) {
cfg := ConsumerConfig{Reader: ReaderConfig{}}

// When
cfg.validate()
cfg.setDefaults()

// Then
if cfg.Concurrency != 1 {
Expand All @@ -24,12 +24,27 @@ func TestConsumerConfig_validate(t *testing.T) {
t.Fatalf("Reader Commit Interval default value must equal to 1s")
}
})
t.Run("Set_Defaults_When_Distributed_Tracing_Enabled", func(t *testing.T) {
// Given
cfg := ConsumerConfig{Reader: ReaderConfig{}, DistributedTracingEnabled: true}

// When
cfg.setDefaults()

// Then
if cfg.DistributedTracingConfiguration.TracerProvider == nil {
t.Fatal("Traceprovider cannot be null")
}
if cfg.DistributedTracingConfiguration.Propagator == nil {
t.Fatal("Propagator cannot be null")
}
})
t.Run("Set_Commit_Interval_Value_To_The_Internal_Reader", func(t *testing.T) {
// Given
cfg := ConsumerConfig{CommitInterval: 5 * time.Second, Reader: ReaderConfig{}}

// When
cfg.validate()
cfg.setDefaults()

// Then
if cfg.CommitInterval != 5*time.Second {
Expand Down
9 changes: 9 additions & 0 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
version: "3.9"
services:
jaeger-all-in-one:
image: jaegertracing/all-in-one:latest
environment:
COLLECTOR_OTLP_ENABLED: "true"
ports:
- "14268:14268"
- "16686:16686"
- "4318:4318"

zookeeper:
image: debezium/zookeeper
ports:
Expand Down
2 changes: 1 addition & 1 deletion examples/with-deadletter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
)

func main() {
producer, _ := kafka.NewProducer(kafka.ProducerConfig{
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
Expand Down
2 changes: 1 addition & 1 deletion examples/with-grafana/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var messages = []user{

func main() {
// create new kafka producer
producer, _ := kafka.NewProducer(kafka.ProducerConfig{
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
Expand Down
2 changes: 1 addition & 1 deletion examples/with-kafka-producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func main() {
producer, _ := kafka.NewProducer(kafka.ProducerConfig{
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
Expand Down
37 changes: 37 additions & 0 deletions examples/with-tracing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
### Distributed Tracing

[segmentio/kafka-go](https://github.com/segmentio/kafka-go) has no support for open telemetry.
There is also an [issue](https://github.com/segmentio/kafka-go/issues/1025) about it.
Based on some work on that issue, we created a project called [otel-kafka-konsumer](https://github.com/Trendyol/otel-kafka-konsumer).

By integrating this project with kafka-konsumer, we successfully implemented distributed tracing in consuming
and producing operations. You can run demo.

In this demo, we chose jaeger to show how to integrate distributed tracing on your project using kafka-konsumer.
But zipkin, stdout, and other alternatives are still applicable

Two settings are significant.
- trace.TracerProvider _(you can set jaeger,zipkin etc.)_
- propagation.TextMapPropagator (please refer to [here](https://opentelemetry.io/docs/specs/otel/context/api-propagators/))

If you have not specified its values, kafka-konsumer uses global.TraceProvider and Propagation.

### Demo overview

![Tracing Example](../../.github/images/tracing.png)

### How to run demo?

You should run [docker-compose.yml](../docker-compose.yml) by

```sh
docker-compose up build
```

You can access the jaeger dashboard as [jaeger dashboard](http://localhost:16686/search)

You can run the demo as `go run main.go`

In the producing step, we open only two spans. In the consuming step, we open three spans. You can see their relationship via the jeager dashboard, as shown below.

![Demo Jeager](../../.github/images/jaeger-dashboard-example.png)
Loading