diff --git a/.github/images/jaeger-dashboard-example.png b/.github/images/jaeger-dashboard-example.png new file mode 100644 index 0000000..69c72d5 Binary files /dev/null and b/.github/images/jaeger-dashboard-example.png differ diff --git a/.github/images/tracing.png b/.github/images/tracing.png new file mode 100644 index 0000000..43ee140 Binary files /dev/null and b/.github/images/tracing.png differ diff --git a/.gitignore b/.gitignore index f3f260a..e5129de 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .idea -dist/ \ No newline at end of file +dist/ +unit_coverage.out +unit_coverage.html \ No newline at end of file diff --git a/README.md b/README.md index 181b2ad..3b421c4 100644 --- a/README.md +++ b/README.md @@ -117,9 +117,13 @@ After running `docker-compose up` command, you can run any application you want. fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value) return nil } - + +#### With Distributed Tracing Support + +Please refer to [Tracing Example](examples/with-tracing/README.md) + #### With Grafana & Prometheus In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can diff --git a/api_test.go b/api_test.go new file mode 100644 index 0000000..dad29a4 --- /dev/null +++ b/api_test.go @@ -0,0 +1,22 @@ +package kafka + +import "testing" + +func Test_setDefaults(t *testing.T) { + // Given + cfg := ConsumerConfig{} + + // When + setDefaults(&cfg) + + // Then + if *cfg.APIConfiguration.Port != 8090 { + t.Fatal("Default API Port is 8090") + } + if *cfg.APIConfiguration.HealthCheckPath != "/healthcheck" { + t.Fatal("Default Healtcheck path is /healthcheck") + } + if *cfg.MetricConfiguration.Path != "/metrics" { + t.Fatal("Default Healtcheck path is /metrics") + } +} diff --git a/consumer_base.go b/consumer_base.go index eead203..da1a59c 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -4,6 +4,9 @@ import ( "context" "sync" + "github.com/Trendyol/otel-kafka-konsumer" + "go.opentelemetry.io/otel/propagation" + "github.com/prometheus/client_golang/prometheus" cronsumer "github.com/Trendyol/kafka-cronsumer" @@ -28,21 +31,23 @@ type Reader interface { } type base struct { - cronsumer kcronsumer.Cronsumer - api API - logger LoggerInterface - metric *ConsumerMetric - context context.Context - messageCh chan Message - quit chan struct{} - cancelFn context.CancelFunc - r Reader - retryTopic string - subprocesses subprocesses - wg sync.WaitGroup - concurrency int - once sync.Once - retryEnabled bool + cronsumer kcronsumer.Cronsumer + api API + logger LoggerInterface + metric *ConsumerMetric + context context.Context + messageCh chan Message + quit chan struct{} + cancelFn context.CancelFunc + r Reader + retryTopic string + subprocesses subprocesses + wg sync.WaitGroup + concurrency int + once sync.Once + retryEnabled bool + distributedTracingEnabled bool + propagator propagation.TextMapPropagator } func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -63,14 +68,19 @@ func newBase(cfg *ConsumerConfig) (*base, error) { } c := base{ - metric: &ConsumerMetric{}, - messageCh: make(chan Message, cfg.Concurrency), - quit: make(chan struct{}), - concurrency: cfg.Concurrency, - retryEnabled: cfg.RetryEnabled, - logger: log, - subprocesses: newSubProcesses(), - r: reader, + metric: &ConsumerMetric{}, + messageCh: make(chan Message, cfg.Concurrency), + quit: make(chan struct{}), + concurrency: cfg.Concurrency, + retryEnabled: cfg.RetryEnabled, + distributedTracingEnabled: cfg.DistributedTracingEnabled, + logger: log, + subprocesses: newSubProcesses(), + r: reader, + } + + if cfg.DistributedTracingEnabled { + c.propagator = cfg.DistributedTracingConfiguration.Propagator } c.context, c.cancelFn = context.WithCancel(context.Background()) @@ -114,17 +124,12 @@ func (c *base) startConsume() { continue } - c.messageCh <- Message{ - Topic: message.Topic, - Partition: message.Partition, - Offset: message.Offset, - HighWaterMark: message.HighWaterMark, - Key: message.Key, - Value: message.Value, - Headers: message.Headers, - WriterData: message.WriterData, - Time: message.Time, + consumedMessage := fromKafkaMessage(message) + if c.distributedTracingEnabled { + consumedMessage.Context = c.propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(message)) } + + c.messageCh <- consumedMessage } } } diff --git a/consumer_config.go b/consumer_config.go index a5dfeac..468232f 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -3,6 +3,10 @@ package kafka import ( "time" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" lcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/logger" @@ -21,23 +25,25 @@ type DialConfig struct { } type ConsumerConfig struct { - APIConfiguration APIConfiguration - Logger LoggerInterface - MetricConfiguration MetricConfiguration - SASL *SASLConfig - TLS *TLSConfig - Dial *DialConfig - BatchConfiguration *BatchConfiguration - ConsumeFn ConsumeFn - ClientID string - Rack string - LogLevel LogLevel - Reader ReaderConfig - RetryConfiguration RetryConfiguration - CommitInterval time.Duration - Concurrency int - RetryEnabled bool - APIEnabled bool + APIConfiguration APIConfiguration + Logger LoggerInterface + MetricConfiguration MetricConfiguration + SASL *SASLConfig + TLS *TLSConfig + Dial *DialConfig + BatchConfiguration *BatchConfiguration + ConsumeFn ConsumeFn + ClientID string + Rack string + LogLevel LogLevel + Reader ReaderConfig + RetryConfiguration RetryConfiguration + CommitInterval time.Duration + DistributedTracingEnabled bool + DistributedTracingConfiguration DistributedTracingConfiguration + Concurrency int + RetryEnabled bool + APIEnabled bool } func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { @@ -95,6 +101,11 @@ type MetricConfiguration struct { Path *string } +type DistributedTracingConfiguration struct { + TracerProvider trace.TracerProvider + Propagator propagation.TextMapPropagator +} + type RetryConfiguration struct { SASL *SASLConfig TLS *TLSConfig @@ -139,7 +150,7 @@ func (cfg *ConsumerConfig) newKafkaDialer() (*kafka.Dialer, error) { } func (cfg *ConsumerConfig) newKafkaReader() (Reader, error) { - cfg.validate() + cfg.setDefaults() dialer, err := cfg.newKafkaDialer() if err != nil { @@ -154,10 +165,14 @@ func (cfg *ConsumerConfig) newKafkaReader() (Reader, error) { reader := kafka.NewReader(readerCfg) + if cfg.DistributedTracingEnabled { + return NewOtelReaderWrapper(cfg, reader) + } + return NewReaderWrapper(reader), nil } -func (cfg *ConsumerConfig) validate() { +func (cfg *ConsumerConfig) setDefaults() { if cfg.Concurrency == 0 { cfg.Concurrency = 1 } @@ -169,4 +184,13 @@ func (cfg *ConsumerConfig) validate() { } else { cfg.Reader.CommitInterval = cfg.CommitInterval } + + if cfg.DistributedTracingEnabled { + if cfg.DistributedTracingConfiguration.Propagator == nil { + cfg.DistributedTracingConfiguration.Propagator = otel.GetTextMapPropagator() + } + if cfg.DistributedTracingConfiguration.TracerProvider == nil { + cfg.DistributedTracingConfiguration.TracerProvider = otel.GetTracerProvider() + } + } } diff --git a/consumer_config_test.go b/consumer_config_test.go index 24c30ce..6404d42 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -11,7 +11,7 @@ func TestConsumerConfig_validate(t *testing.T) { cfg := ConsumerConfig{Reader: ReaderConfig{}} // When - cfg.validate() + cfg.setDefaults() // Then if cfg.Concurrency != 1 { @@ -24,12 +24,27 @@ func TestConsumerConfig_validate(t *testing.T) { t.Fatalf("Reader Commit Interval default value must equal to 1s") } }) + t.Run("Set_Defaults_When_Distributed_Tracing_Enabled", func(t *testing.T) { + // Given + cfg := ConsumerConfig{Reader: ReaderConfig{}, DistributedTracingEnabled: true} + + // When + cfg.setDefaults() + + // Then + if cfg.DistributedTracingConfiguration.TracerProvider == nil { + t.Fatal("Traceprovider cannot be null") + } + if cfg.DistributedTracingConfiguration.Propagator == nil { + t.Fatal("Propagator cannot be null") + } + }) t.Run("Set_Commit_Interval_Value_To_The_Internal_Reader", func(t *testing.T) { // Given cfg := ConsumerConfig{CommitInterval: 5 * time.Second, Reader: ReaderConfig{}} // When - cfg.validate() + cfg.setDefaults() // Then if cfg.CommitInterval != 5*time.Second { diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml index 3b3ead7..de84b17 100644 --- a/examples/docker-compose.yml +++ b/examples/docker-compose.yml @@ -1,5 +1,14 @@ version: "3.9" services: + jaeger-all-in-one: + image: jaegertracing/all-in-one:latest + environment: + COLLECTOR_OTLP_ENABLED: "true" + ports: + - "14268:14268" + - "16686:16686" + - "4318:4318" + zookeeper: image: debezium/zookeeper ports: diff --git a/examples/with-deadletter/main.go b/examples/with-deadletter/main.go index b11c361..fce7eb0 100644 --- a/examples/with-deadletter/main.go +++ b/examples/with-deadletter/main.go @@ -17,7 +17,7 @@ const ( ) func main() { - producer, _ := kafka.NewProducer(kafka.ProducerConfig{ + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, diff --git a/examples/with-grafana/main.go b/examples/with-grafana/main.go index 17a299b..82a0605 100644 --- a/examples/with-grafana/main.go +++ b/examples/with-grafana/main.go @@ -27,7 +27,7 @@ var messages = []user{ func main() { // create new kafka producer - producer, _ := kafka.NewProducer(kafka.ProducerConfig{ + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, diff --git a/examples/with-kafka-producer/main.go b/examples/with-kafka-producer/main.go index edaee16..5c7cdcb 100644 --- a/examples/with-kafka-producer/main.go +++ b/examples/with-kafka-producer/main.go @@ -6,7 +6,7 @@ import ( ) func main() { - producer, _ := kafka.NewProducer(kafka.ProducerConfig{ + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, diff --git a/examples/with-tracing/README.md b/examples/with-tracing/README.md new file mode 100644 index 0000000..ea1269b --- /dev/null +++ b/examples/with-tracing/README.md @@ -0,0 +1,37 @@ +### Distributed Tracing + +[segmentio/kafka-go](https://github.com/segmentio/kafka-go) has no support for open telemetry. +There is also an [issue](https://github.com/segmentio/kafka-go/issues/1025) about it. +Based on some work on that issue, we created a project called [otel-kafka-konsumer](https://github.com/Trendyol/otel-kafka-konsumer). + +By integrating this project with kafka-konsumer, we successfully implemented distributed tracing in consuming +and producing operations. You can run demo. + +In this demo, we chose jaeger to show how to integrate distributed tracing on your project using kafka-konsumer. +But zipkin, stdout, and other alternatives are still applicable + +Two settings are significant. +- trace.TracerProvider _(you can set jaeger,zipkin etc.)_ +- propagation.TextMapPropagator (please refer to [here](https://opentelemetry.io/docs/specs/otel/context/api-propagators/)) + +If you have not specified its values, kafka-konsumer uses global.TraceProvider and Propagation. + +### Demo overview + +![Tracing Example](../../.github/images/tracing.png) + +### How to run demo? + +You should run [docker-compose.yml](../docker-compose.yml) by + +```sh +docker-compose up build +``` + +You can access the jaeger dashboard as [jaeger dashboard](http://localhost:16686/search) + +You can run the demo as `go run main.go` + +In the producing step, we open only two spans. In the consuming step, we open three spans. You can see their relationship via the jeager dashboard, as shown below. + +![Demo Jeager](../../.github/images/jaeger-dashboard-example.png) \ No newline at end of file diff --git a/examples/with-tracing/main.go b/examples/with-tracing/main.go new file mode 100644 index 0000000..2c3b1b2 --- /dev/null +++ b/examples/with-tracing/main.go @@ -0,0 +1,105 @@ +package main + +import ( + "context" + "fmt" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.19.0" + "os" + "os/signal" + "time" + + "github.com/Trendyol/kafka-konsumer" +) + +func main() { + jaegerUrl := "http://localhost:14268/api/traces" + tp := initJaegerTracer(jaegerUrl) + defer tp.Shutdown(context.Background()) + + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + + // ===============SIMULATE PRODUCER=============== + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{ + Brokers: []string{"localhost:29092"}, + }, + DistributedTracingEnabled: true, + }) + + const topicName = "standart-topic" + producedMessage := kafka.Message{ + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + } + + tr := otel.Tracer("after producing") + parentCtx, span := tr.Start(context.Background(), "before producing work") + time.Sleep(100 * time.Millisecond) + span.End() + + _ = producer.Produce(parentCtx, producedMessage) + + // ===============SIMULATE CONSUMER=============== + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: topicName, + GroupID: "standart-cg", + }, + ConsumeFn: consumeFn, + DistributedTracingEnabled: true, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + fmt.Println("Consumer started...!") + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c +} + +func consumeFn(message kafka.Message) error { + fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value)) + + tr := otel.Tracer("consumer") + parentCtx, span := tr.Start(message.Context, "work") + time.Sleep(100 * time.Millisecond) + span.End() + + _, span = tr.Start(parentCtx, "another work") + time.Sleep(50 * time.Millisecond) + span.End() + + return nil +} + +func initJaegerTracer(url string) *trace.TracerProvider { + // Create the Jaeger exporter + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) + if err != nil { + panic("Err initializing jaeger instance" + err.Error()) + } + + tp := trace.NewTracerProvider( + trace.WithBatcher(exp), + trace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("kafka-konsumer-demo"), + attribute.String("environment", "prod"), + )), + ) + + return tp +} diff --git a/go.mod b/go.mod index aabaccf..f4082a6 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,15 @@ go 1.19 require ( github.com/Trendyol/kafka-cronsumer v1.3.3 + github.com/Trendyol/otel-kafka-konsumer v0.0.5 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/gofiber/fiber/v2 v2.48.0 github.com/prometheus/client_golang v1.16.0 - github.com/segmentio/kafka-go v0.4.42 + github.com/segmentio/kafka-go v0.4.43 + go.opentelemetry.io/otel v1.19.0 + go.opentelemetry.io/otel/exporters/jaeger v1.16.0 + go.opentelemetry.io/otel/sdk v1.19.0 + go.opentelemetry.io/otel/trace v1.19.0 go.uber.org/zap v1.24.0 ) @@ -15,15 +20,17 @@ require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/gofiber/adaptor/v2 v2.2.1 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.3.0 // indirect - github.com/klauspost/compress v1.16.6 // indirect + github.com/klauspost/compress v1.17.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect @@ -35,9 +42,10 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect + go.opentelemetry.io/otel/metric v1.19.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/sys v0.10.0 // indirect + golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.9.0 // indirect google.golang.org/protobuf v1.30.0 // indirect ) diff --git a/go.sum b/go.sum index 9622db8..dcd60b9 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/Trendyol/kafka-cronsumer v1.3.3 h1:KNlX/L4bDkpNAXTqHUQNrnkC/kauL6djKp0tuOFny8Y= github.com/Trendyol/kafka-cronsumer v1.3.3/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= +github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= @@ -12,6 +14,11 @@ github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/gofiber/adaptor/v2 v2.2.1 h1:givE7iViQWlsTR4Jh7tB4iXzrlKBgiraB/yTdHs9Lv4= github.com/gofiber/adaptor/v2 v2.2.1/go.mod h1:AhR16dEqs25W2FY/l8gSj1b51Azg5dtPDmm+pruNOrc= github.com/gofiber/fiber/v2 v2.48.0 h1:cRVMCb9aUJDsyHxGFLwz/sGzDggdailZZyptU9F9cU0= @@ -25,8 +32,8 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= -github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -37,8 +44,8 @@ github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= -github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -55,13 +62,14 @@ github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/segmentio/kafka-go v0.4.42 h1:qffhBZCz4WcWyNuHEclHjIMLs2slp6mZO8px+5W5tfU= -github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg= +github.com/segmentio/kafka-go v0.4.43 h1:yKVQ/i6BobbX7AWzwkhulsEn47wpLA8eO6H03bCMqYg= +github.com/segmentio/kafka-go v0.4.43/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.48.0 h1:oJWvHb9BIZToTQS3MuQ2R3bJZiNSa2KiNdeI8A+79Tc= @@ -75,6 +83,16 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/exporters/jaeger v1.16.0 h1:YhxxmXZ011C0aDZKoNw+juVWAmEfv/0W2XBOv9aHTaA= +go.opentelemetry.io/otel/exporters/jaeger v1.16.0/go.mod h1:grYbBo/5afWlPpdPZYhyn78Bk04hnvxn2+hvxQhKIQM= +go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= +go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= +go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= @@ -101,8 +119,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= -golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/message.go b/message.go index 9dbf8d1..bc79210 100644 --- a/message.go +++ b/message.go @@ -1,12 +1,56 @@ package kafka import ( + "context" + "time" + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/protocol" ) -type Message kafka.Message +type Message struct { + Topic string + Partition int + Offset int64 + HighWaterMark int64 + Key []byte + Value []byte + Headers []kafka.Header + WriterData interface{} + Time time.Time + // Context To enable distributed tracing support + Context context.Context +} + +func (m *Message) toKafkaMessage() kafka.Message { + return kafka.Message{ + Topic: m.Topic, + Partition: m.Partition, + Offset: m.Offset, + HighWaterMark: m.HighWaterMark, + Key: m.Key, + Value: m.Value, + Headers: m.Headers, + WriterData: m.WriterData, + Time: m.Time, + } +} + +func fromKafkaMessage(message *kafka.Message) Message { + return Message{ + Topic: message.Topic, + Partition: message.Partition, + Offset: message.Offset, + HighWaterMark: message.HighWaterMark, + Key: message.Key, + Value: message.Value, + Headers: message.Headers, + WriterData: message.WriterData, + Time: message.Time, + Context: context.TODO(), + } +} func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message { headers := make([]kcronsumer.Header, 0, len(m.Headers)) diff --git a/otel_producer.go b/otel_producer.go new file mode 100644 index 0000000..0284593 --- /dev/null +++ b/otel_producer.go @@ -0,0 +1,62 @@ +package kafka + +import ( + "context" + "fmt" + + "github.com/Trendyol/otel-kafka-konsumer" + segmentio "github.com/segmentio/kafka-go" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.19.0" +) + +type OtelKafkaKonsumerWriter interface { + WriteMessage(ctx context.Context, msg segmentio.Message) error + WriteMessages(ctx context.Context, msgs []segmentio.Message) error + Close() error +} + +type otelProducer struct { + w OtelKafkaKonsumerWriter +} + +func NewOtelProducer(cfg *ProducerConfig, writer *segmentio.Writer) (Writer, error) { + cfg.setDefaults() + + w, err := otelkafkakonsumer.NewWriter(writer, + otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), + otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), + otelkafkakonsumer.WithAttributes( + []attribute.KeyValue{ + semconv.MessagingDestinationKindTopic, + semconv.MessagingKafkaClientIDKey.String(cfg.ClientID), + }, + )) + if err != nil { + return nil, err + } + + return &otelProducer{ + w: w, + }, nil +} + +// Currently, we are not support tracing on batch producing. You can create custom span. +// There is an issue about it: https://github.com/Trendyol/otel-kafka-konsumer/issues/4 +func (o *otelProducer) WriteMessages(ctx context.Context, messages ...segmentio.Message) error { + if len(messages) == 1 { + if err := o.w.WriteMessage(ctx, messages[0]); err != nil { + return fmt.Errorf("error during producing %w", err) + } + } + + if err := o.w.WriteMessages(ctx, messages); err != nil { + return fmt.Errorf("error during batch producing %w", err) + } + + return nil +} + +func (o *otelProducer) Close() error { + return o.w.Close() +} diff --git a/otel_producer_test.go b/otel_producer_test.go new file mode 100644 index 0000000..65d23e3 --- /dev/null +++ b/otel_producer_test.go @@ -0,0 +1,82 @@ +package kafka + +import ( + "context" + "errors" + "testing" + + "github.com/segmentio/kafka-go" +) + +func Test_otelProducer_WriteMessages(t *testing.T) { + t.Run("Return_Success_When_Single_Message_Producing", func(t *testing.T) { + // Given + o := otelProducer{w: mockOtelKafkaKonsumerWriter{wantErr: false}} + + // When + err := o.WriteMessages(context.TODO(), kafka.Message{}) + // Then + if err != nil { + t.Fatalf("Error when single producing %v", err) + } + }) + t.Run("Return_Error_When_Single_Message_Producing", func(t *testing.T) { + // Given + o := otelProducer{w: mockOtelKafkaKonsumerWriter{wantErr: true}} + + // When + err := o.WriteMessages(context.TODO(), kafka.Message{}) + + // Then + if err == nil { + t.Fatalf("Success when single producing %v", err) + } + }) + t.Run("Return_Success_When_Batch_Producing", func(t *testing.T) { + // Given + o := otelProducer{w: mockOtelKafkaKonsumerWriter{wantErr: false}} + + // When + err := o.WriteMessages(context.TODO(), kafka.Message{}, kafka.Message{}) + // Then + if err != nil { + t.Fatalf("Error when batch producing %v", err) + } + }) + t.Run("Return_Error_When_Batch_Producing", func(t *testing.T) { + // Given + o := otelProducer{w: mockOtelKafkaKonsumerWriter{wantErr: true}} + + // When + err := o.WriteMessages(context.TODO(), kafka.Message{}, kafka.Message{}) + + // Then + if err == nil { + t.Fatalf("Success when single producing %v", err) + } + }) +} + +type mockOtelKafkaKonsumerWriter struct { + wantErr bool +} + +var _ OtelKafkaKonsumerWriter = (*mockOtelKafkaKonsumerWriter)(nil) + +func (m mockOtelKafkaKonsumerWriter) WriteMessage(ctx context.Context, msg kafka.Message) error { + if m.wantErr { + return errors.New("err occurred") + } + return nil +} + +func (m mockOtelKafkaKonsumerWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) error { + if m.wantErr { + return errors.New("err occurred") + } + return nil +} + +func (m mockOtelKafkaKonsumerWriter) Close() error { + panic("implement me") +} diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go new file mode 100644 index 0000000..0fa07a5 --- /dev/null +++ b/otel_reader_wrapper.go @@ -0,0 +1,44 @@ +package kafka + +import ( + "context" + + "github.com/Trendyol/otel-kafka-konsumer" + segmentio "github.com/segmentio/kafka-go" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.19.0" +) + +type otelReaderWrapper struct { + r *otelkafkakonsumer.Reader +} + +func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { + cfg.setDefaults() + + newReader, err := otelkafkakonsumer.NewReader( + reader, + otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), + otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), + otelkafkakonsumer.WithAttributes( + []attribute.KeyValue{ + semconv.MessagingDestinationKindTopic, + semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), + }, + )) + if err != nil { + return nil, err + } + + return &otelReaderWrapper{ + r: newReader, + }, nil +} + +func (o *otelReaderWrapper) ReadMessage(ctx context.Context) (*segmentio.Message, error) { + return o.r.ReadMessage(ctx) +} + +func (o *otelReaderWrapper) Close() error { + return o.r.Close() +} diff --git a/producer.go b/producer.go index 3dea75d..ee2910d 100644 --- a/producer.go +++ b/producer.go @@ -21,24 +21,57 @@ type producer struct { w Writer } -func NewProducer(cfg ProducerConfig) (Producer, error) { - writer, err := cfg.newKafkaWriter() - if err != nil { - return nil, err +func NewProducer(cfg *ProducerConfig) (Producer, error) { + kafkaWriter := &kafka.Writer{ + Addr: kafka.TCP(cfg.Writer.Brokers...), + Topic: cfg.Writer.Topic, + Balancer: cfg.Writer.Balancer, + MaxAttempts: cfg.Writer.MaxAttempts, + WriteBackoffMin: cfg.Writer.WriteBackoffMin, + WriteBackoffMax: cfg.Writer.WriteBackoffMax, + BatchSize: cfg.Writer.BatchSize, + BatchBytes: cfg.Writer.BatchBytes, + BatchTimeout: cfg.Writer.BatchTimeout, + ReadTimeout: cfg.Writer.ReadTimeout, + WriteTimeout: cfg.Writer.WriteTimeout, + RequiredAcks: cfg.Writer.RequiredAcks, + Async: cfg.Writer.Async, + Completion: cfg.Writer.Completion, + Compression: cfg.Writer.Compression, + Logger: cfg.Writer.Logger, + ErrorLogger: cfg.Writer.ErrorLogger, + AllowAutoTopicCreation: cfg.Writer.AllowAutoTopicCreation, } - return &producer{w: writer}, nil + if cfg.SASL != nil || cfg.TLS != nil { + transport, err := cfg.newKafkaTransport() + if err != nil { + return nil, err + } + kafkaWriter.Transport = transport + } + + p := &producer{w: kafkaWriter} + + if cfg.DistributedTracingEnabled { + otelWriter, err := NewOtelProducer(cfg, kafkaWriter) + if err != nil { + return nil, err + } + p.w = otelWriter + } + + return p, nil } func (c *producer) Produce(ctx context.Context, message Message) error { - return c.w.WriteMessages(ctx, kafka.Message(message)) + return c.w.WriteMessages(ctx, message.toKafkaMessage()) } func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error { kafkaMessages := make([]kafka.Message, 0, len(messages)) for i := range messages { - convertedMessage := kafka.Message(messages[i]) - kafkaMessages = append(kafkaMessages, convertedMessage) + kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage()) } return c.w.WriteMessages(ctx, kafkaMessages...) } diff --git a/producer_config.go b/producer_config.go index b8dbf2e..fde6746 100644 --- a/producer_config.go +++ b/producer_config.go @@ -3,6 +3,8 @@ package kafka import ( "time" + "go.opentelemetry.io/otel" + "github.com/segmentio/kafka-go" ) @@ -35,63 +37,43 @@ type TransportConfig struct { } type ProducerConfig struct { - Transport *TransportConfig - SASL *SASLConfig - TLS *TLSConfig - ClientID string - Writer WriterConfig + Transport *TransportConfig + SASL *SASLConfig + TLS *TLSConfig + ClientID string + Writer WriterConfig + DistributedTracingEnabled bool + DistributedTracingConfiguration DistributedTracingConfiguration } -func (c ProducerConfig) newKafkaWriter() (Writer, error) { - kafkaWriter := &kafka.Writer{ - Addr: kafka.TCP(c.Writer.Brokers...), - Topic: c.Writer.Topic, - Balancer: c.Writer.Balancer, - MaxAttempts: c.Writer.MaxAttempts, - WriteBackoffMin: c.Writer.WriteBackoffMin, - WriteBackoffMax: c.Writer.WriteBackoffMax, - BatchSize: c.Writer.BatchSize, - BatchBytes: c.Writer.BatchBytes, - BatchTimeout: c.Writer.BatchTimeout, - ReadTimeout: c.Writer.ReadTimeout, - WriteTimeout: c.Writer.WriteTimeout, - RequiredAcks: c.Writer.RequiredAcks, - Async: c.Writer.Async, - Completion: c.Writer.Completion, - Compression: c.Writer.Compression, - Logger: c.Writer.Logger, - ErrorLogger: c.Writer.ErrorLogger, - AllowAutoTopicCreation: c.Writer.AllowAutoTopicCreation, - } - - if c.SASL != nil || c.TLS != nil { - transport, err := c.newKafkaTransport() - if err != nil { - return nil, err - } - kafkaWriter.Transport = transport - } - - return kafkaWriter, nil -} - -func (c ProducerConfig) newKafkaTransport() (*kafka.Transport, error) { +func (cfg *ProducerConfig) newKafkaTransport() (*kafka.Transport, error) { transport := &Transport{ Transport: &kafka.Transport{ - ClientID: c.ClientID, + ClientID: cfg.ClientID, }, } - if c.Transport != nil { - transport.Transport.DialTimeout = c.Transport.DialTimeout - transport.Transport.IdleTimeout = c.Transport.IdleTimeout - transport.Transport.MetadataTTL = c.Transport.MetadataTTL - transport.Transport.MetadataTopics = c.Transport.MetadataTopics + if cfg.Transport != nil { + transport.Transport.DialTimeout = cfg.Transport.DialTimeout + transport.Transport.IdleTimeout = cfg.Transport.IdleTimeout + transport.Transport.MetadataTTL = cfg.Transport.MetadataTTL + transport.Transport.MetadataTopics = cfg.Transport.MetadataTopics } - if err := fillLayer(transport, c.SASL, c.TLS); err != nil { + if err := fillLayer(transport, cfg.SASL, cfg.TLS); err != nil { return nil, err } return transport.Transport, nil } + +func (cfg *ProducerConfig) setDefaults() { + if cfg.DistributedTracingEnabled { + if cfg.DistributedTracingConfiguration.TracerProvider == nil { + cfg.DistributedTracingConfiguration.TracerProvider = otel.GetTracerProvider() + } + if cfg.DistributedTracingConfiguration.Propagator == nil { + cfg.DistributedTracingConfiguration.Propagator = otel.GetTextMapPropagator() + } + } +} diff --git a/producer_config_test.go b/producer_config_test.go new file mode 100644 index 0000000..95bf956 --- /dev/null +++ b/producer_config_test.go @@ -0,0 +1,19 @@ +package kafka + +import "testing" + +func TestProducerConfig_setDefaults(t *testing.T) { + // Given + cfg := ProducerConfig{DistributedTracingEnabled: true} + + // When + cfg.setDefaults() + + // Then + if cfg.DistributedTracingConfiguration.TracerProvider == nil { + t.Fatal("Traceprovider cannot be null") + } + if cfg.DistributedTracingConfiguration.Propagator == nil { + t.Fatal("Propagator cannot be null") + } +} diff --git a/test/integration/go.mod b/test/integration/go.mod index 6d5572b..e5292b3 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -5,26 +5,29 @@ go 1.19 replace github.com/Trendyol/kafka-konsumer => ../.. require ( - github.com/Trendyol/kafka-konsumer v1.3.4 - github.com/segmentio/kafka-go v0.4.42 + github.com/Trendyol/kafka-konsumer v0.0.0-00010101000000-000000000000 + github.com/segmentio/kafka-go v0.4.43 ) require ( + github.com/Trendyol/otel-kafka-konsumer v0.0.0 // indirect github.com/Trendyol/kafka-cronsumer v1.3.3 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/gofiber/adaptor/v2 v2.2.1 // indirect github.com/gofiber/fiber/v2 v2.48.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.3.0 // indirect - github.com/klauspost/compress v1.16.6 // indirect + github.com/klauspost/compress v1.17.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect @@ -37,6 +40,9 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect + go.opentelemetry.io/otel v1.18.0 // indirect + go.opentelemetry.io/otel/metric v1.18.0 // indirect + go.opentelemetry.io/otel/trace v1.18.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 9622db8..c582be8 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,3 +1,5 @@ +github.com/Trendyol/otel-kafka-konsumer v0.0.0 h1:5iLegl3ZoOVAIgua/ne5OrTwYITLz+4EhbK7i/orXOw= +github.com/Trendyol/otel-kafka-konsumer v0.0.0/go.mod h1:SOtgXp7znhCuI/+F4by91/0JWLfFqcZyYd1x9EtoxUU= github.com/Trendyol/kafka-cronsumer v1.3.3 h1:KNlX/L4bDkpNAXTqHUQNrnkC/kauL6djKp0tuOFny8Y= github.com/Trendyol/kafka-cronsumer v1.3.3/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= @@ -12,6 +14,11 @@ github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/gofiber/adaptor/v2 v2.2.1 h1:givE7iViQWlsTR4Jh7tB4iXzrlKBgiraB/yTdHs9Lv4= github.com/gofiber/adaptor/v2 v2.2.1/go.mod h1:AhR16dEqs25W2FY/l8gSj1b51Azg5dtPDmm+pruNOrc= github.com/gofiber/fiber/v2 v2.48.0 h1:cRVMCb9aUJDsyHxGFLwz/sGzDggdailZZyptU9F9cU0= @@ -25,8 +32,8 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk= -github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -37,8 +44,8 @@ github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= -github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= +github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -55,13 +62,13 @@ github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/segmentio/kafka-go v0.4.42 h1:qffhBZCz4WcWyNuHEclHjIMLs2slp6mZO8px+5W5tfU= -github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg= +github.com/segmentio/kafka-go v0.4.43 h1:yKVQ/i6BobbX7AWzwkhulsEn47wpLA8eO6H03bCMqYg= +github.com/segmentio/kafka-go v0.4.43/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.48.0 h1:oJWvHb9BIZToTQS3MuQ2R3bJZiNSa2KiNdeI8A+79Tc= @@ -75,6 +82,12 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/otel v1.18.0 h1:TgVozPGZ01nHyDZxK5WGPFB9QexeTMXEH7+tIClWfzs= +go.opentelemetry.io/otel v1.18.0/go.mod h1:9lWqYO0Db579XzVuCKFNPDl4s73Voa+zEck3wHaAYQI= +go.opentelemetry.io/otel/metric v1.18.0 h1:JwVzw94UYmbx3ej++CwLUQZxEODDj/pOuTCvzhtRrSQ= +go.opentelemetry.io/otel/metric v1.18.0/go.mod h1:nNSpsVDjWGfb7chbRLUNW+PBNdcSTHD4Uu5pfFMOI0k= +go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10= +go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 74caad5..7a2af80 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -15,7 +15,7 @@ func Test_Should_Produce_Successfully(t *testing.T) { topic := "produce-topic" brokerAddress := "localhost:9092" - producer, _ := kafka.NewProducer(kafka.ProducerConfig{ + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, Transport: &kafka.TransportConfig{ MetadataTopics: []string{ @@ -41,7 +41,7 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { topic := "batch-produce-topic" brokerAddress := "localhost:9092" - producer, _ := kafka.NewProducer(kafka.ProducerConfig{ + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}) // When