Skip to content

Commit

Permalink
feat: add manual commit support to otel readers
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed Dec 7, 2023
1 parent 1530acb commit 5e34ff9
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 44 deletions.
2 changes: 1 addition & 1 deletion examples/with-sasl-plaintext/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require github.com/Trendyol/kafka-konsumer v0.0.0-00010101000000-000000000000

require (
github.com/Trendyol/kafka-cronsumer v1.4.5 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.6 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions examples/with-sasl-plaintext/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6
github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4=
github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/Trendyol/kafka-cronsumer v1.4.5
github.com/Trendyol/otel-kafka-konsumer v0.0.5
github.com/Trendyol/otel-kafka-konsumer v0.0.6
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/gofiber/fiber/v2 v2.50.0
github.com/prometheus/client_golang v1.16.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
github.com/Trendyol/kafka-cronsumer v1.4.4 h1:RfTpVyvxf+FjLxOJIHQXr6zrMjtba6PGUAYXLoGnVuE=
github.com/Trendyol/kafka-cronsumer v1.4.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6ZMC8yZ1JY=
github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4=
github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/Trendyol/otel-kafka-konsumer v0.0.6 h1:FFSft5lIA+kyH+hZW4bxt/xWG6J3PFUGzKZbjyf+fVM=
github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM=
Expand Down
82 changes: 45 additions & 37 deletions otel_reader_wrapper.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,47 @@
package kafka

// type otelReaderWrapper struct {
// r *otelkafkakonsumer.Reader
//}
//
// func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) {
// cfg.setDefaults()
//
// newReader, err := otelkafkakonsumer.NewReader(
// reader,
// otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider),
// otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator),
// otelkafkakonsumer.WithAttributes(
// []attribute.KeyValue{
// semconv.MessagingDestinationKindTopic,
// semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID),
// },
// ))
// if err != nil {
// return nil, err
// }
//
// return &otelReaderWrapper{
// r: newReader,
// }, nil
//}
//
// func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) {
// return o.r.FetchMessage(ctx)
//}
//
// func (o *otelReaderWrapper) Close() error {
// return o.r.Close()
//}
//
// func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error {
// return o.r.CommitMessages(context.Background(), messages...)
//}
import (
"context"

Check failure on line 4 in otel_reader_wrapper.go

View workflow job for this annotation

GitHub Actions / build

File is not `gofumpt`-ed (gofumpt)
"github.com/Trendyol/otel-kafka-konsumer"
segmentio "github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.19.0"
)

type otelReaderWrapper struct {
r *otelkafkakonsumer.Reader
}

func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error) {
cfg.setDefaults()

newReader, err := otelkafkakonsumer.NewReader(
reader,
otelkafkakonsumer.WithTracerProvider(cfg.DistributedTracingConfiguration.TracerProvider),
otelkafkakonsumer.WithPropagator(cfg.DistributedTracingConfiguration.Propagator),
otelkafkakonsumer.WithAttributes(
[]attribute.KeyValue{
semconv.MessagingDestinationKindTopic,
semconv.MessagingKafkaClientIDKey.String(cfg.Reader.GroupID),
},
))
if err != nil {
return nil, err
}

return &otelReaderWrapper{
r: newReader,
}, nil
}

func (o *otelReaderWrapper) FetchMessage(ctx context.Context) (*segmentio.Message, error) {
return o.r.FetchMessage(ctx)
}

func (o *otelReaderWrapper) Close() error {
return o.r.Close()
}

func (o *otelReaderWrapper) CommitMessages(messages []segmentio.Message) error {
return o.r.CommitMessages(context.Background(), messages...)
}
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (

require (
github.com/Trendyol/kafka-cronsumer v1.4.5 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.6 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.5 h1:82MhKZi1tXqFMp2gpSiYaT4UyN6LxumIu6
github.com/Trendyol/kafka-cronsumer v1.4.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4=
github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/Trendyol/otel-kafka-konsumer v0.0.6/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/ansrivas/fiberprometheus/v2 v2.6.1 h1:wac3pXaE6BYYTF04AC6K0ktk6vCD+MnDOJZ3SK66kXM=
Expand Down

0 comments on commit 5e34ff9

Please sign in to comment.