From 5e34ff91cba1f238e512f1245ce5b33477a55cf1 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Thu, 7 Dec 2023 09:29:17 +0300 Subject: [PATCH] feat: add manual commit support to otel readers --- examples/with-sasl-plaintext/go.mod | 2 +- examples/with-sasl-plaintext/go.sum | 1 + go.mod | 2 +- go.sum | 6 +-- otel_reader_wrapper.go | 82 ++++++++++++++++------------- test/integration/go.mod | 2 +- test/integration/go.sum | 1 + 7 files changed, 52 insertions(+), 44 deletions(-) diff --git a/examples/with-sasl-plaintext/go.mod b/examples/with-sasl-plaintext/go.mod index e419066..e8b69fe 100644 --- a/examples/with-sasl-plaintext/go.mod +++ b/examples/with-sasl-plaintext/go.mod @@ -8,7 +8,7 @@ require github.com/Trendyol/kafka-konsumer v0.0.0-00010101000000-000000000000 require ( github.com/Trendyol/kafka-cronsumer v1.4.5 // indirect - github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect + github.com/Trendyol/otel-kafka-konsumer v0.0.6 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/examples/with-sasl-plaintext/go.sum b/examples/with-sasl-plaintext/go.sum index 58296f6..1f83a88 100644 --- a/examples/with-sasl-plaintext/go.sum +++ b/examples/with-sasl-plaintext/go.sum @@ -2,6 +2,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6 github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= diff --git a/go.mod b/go.mod index 9e5fc93..d9225b5 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/Trendyol/kafka-cronsumer v1.4.5 - github.com/Trendyol/otel-kafka-konsumer v0.0.5 + github.com/Trendyol/otel-kafka-konsumer v0.0.6 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/gofiber/fiber/v2 v2.50.0 github.com/prometheus/client_golang v1.16.0 diff --git a/go.sum b/go.sum index 2c16b5e..78c6c87 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,7 @@ -github.com/Trendyol/kafka-cronsumer v1.4.4 h1:RfTpVyvxf+FjLxOJIHQXr6zrMjtba6PGUAYXLoGnVuE= -github.com/Trendyol/kafka-cronsumer v1.4.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6ZMC8yZ1JY= github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= -github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= -github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.6 h1:FFSft5lIA+kyH+hZW4bxt/xWG6J3PFUGzKZbjyf+fVM= +github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM= diff --git a/otel_reader_wrapper.go b/otel_reader_wrapper.go index 52a14be..c374f34 100644 --- a/otel_reader_wrapper.go +++ b/otel_reader_wrapper.go @@ -1,39 +1,47 @@ package kafka -// type otelReaderWrapper struct { -// r *otelkafkakonsumer.Reader -//} -// -// func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { -// cfg.setDefaults() -// -// newReader, err := otelkafkakonsumer.NewReader( -// reader, -// otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), -// otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), -// otelkafkakonsumer.WithAttributes( -// []attribute.KeyValue{ -// semconv.MessagingDestinationKindTopic, -// semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), -// }, -// )) -// if err != nil { -// return nil, err -// } -// -// return &otelReaderWrapper{ -// r: newReader, -// }, nil -//} -// -// func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { -// return o.r.FetchMessage(ctx) -//} -// -// func (o *otelReaderWrapper) Close() error { -// return o.r.Close() -//} -// -// func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { -// return o.r.CommitMessages(context.Background(), messages...) -//} +import ( + "context" + "github.com/Trendyol/otel-kafka-konsumer" + segmentio "github.com/segmentio/kafka-go" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.19.0" +) + +type otelReaderWrapper struct { + r *otelkafkakonsumer.Reader +} + +func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) { + cfg.setDefaults() + + newReader, err := otelkafkakonsumer.NewReader( + reader, + otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider), + otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator), + otelkafkakonsumer.WithAttributes( + []attribute.KeyValue{ + semconv.MessagingDestinationKindTopic, + semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID), + }, + )) + if err != nil { + return nil, err + } + + return &otelReaderWrapper{ + r: newReader, + }, nil +} + +func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) { + return o.r.FetchMessage(ctx) +} + +func (o *otelReaderWrapper) Close() error { + return o.r.Close() +} + +func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error { + return o.r.CommitMessages(context.Background(), messages...) +} diff --git a/test/integration/go.mod b/test/integration/go.mod index aec012d..5f13bd3 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -11,7 +11,7 @@ require ( require ( github.com/Trendyol/kafka-cronsumer v1.4.5 // indirect - github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect + github.com/Trendyol/otel-kafka-konsumer v0.0.6 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 58296f6..1f83a88 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -2,6 +2,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6 github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= +github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM=