From 0883f296e296e7cf1cbf94316940cc6ce4588db8 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Tue, 14 May 2024 19:05:45 +0200 Subject: [PATCH] feat(openmeter): export new kafka collector (#902) --- openmeter/ingest/kafkaingest/collector.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/openmeter/ingest/kafkaingest/collector.go b/openmeter/ingest/kafkaingest/collector.go index ff8f522fd..256ba87d5 100644 --- a/openmeter/ingest/kafkaingest/collector.go +++ b/openmeter/ingest/kafkaingest/collector.go @@ -5,8 +5,10 @@ import ( "log/slog" "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/internal/ingest/kafkaingest" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer" ) // Collector is a receiver of events that handles sending those events to a downstream Kafka broker. @@ -15,3 +17,12 @@ type Collector = kafkaingest.Collector func KafkaProducerGroup(ctx context.Context, producer *kafka.Producer, logger *slog.Logger) (execute func() error, interrupt func(error)) { return kafkaingest.KafkaProducerGroup(ctx, producer, logger) } + +func NewCollector( + producer *kafka.Producer, + serializer serializer.Serializer, + namespacedTopicTemplate string, + metricMeter metric.Meter, +) (*Collector, error) { + return kafkaingest.NewCollector(producer, serializer, namespacedTopicTemplate, metricMeter) +}