Skip to content

Commit

Permalink
(feat) return a default topic when tenant_id is not present (#304)
Browse files Browse the repository at this point in the history
* (feat) return a default topic when tenant_id is not present

* (refactor) rename the function to getKafkaTopicPrefixFromClientMetadata making it consistent with what the function is doing
  • Loading branch information
grandwizard28 authored Mar 26, 2024
1 parent 0dc0b50 commit 539b6be
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 20 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*.out

# Dependency directories (remove the comment below to include it)
# vendor/
vendor/

.vscode
.DS_Store
Expand All @@ -22,4 +22,4 @@
# build folder
.build
*.rollback
signoz-collector
signoz-collector
18 changes: 6 additions & 12 deletions exporter/signozkafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ func (ke kafkaErrors) Error() string {
}

func (e *kafkaTracesProducer) tracesPusher(ctx context.Context, td ptrace.Traces) error {
kafkaTopicPrefix, err := getKafkaTopicFromClientMetadata(client.FromContext(ctx).Metadata)
if err != nil {
return consumererror.NewPermanent(err)
}
kafkaTopicPrefix := getKafkaTopicPrefixFromClientMetadata(client.FromContext(ctx).Metadata)

kafkaTopic := fmt.Sprintf("%s_traces", kafkaTopicPrefix)
messages, err := e.marshaler.Marshal(td, kafkaTopic)
if err != nil {
Expand Down Expand Up @@ -74,10 +72,8 @@ type kafkaMetricsProducer struct {
}

func (e *kafkaMetricsProducer) metricsDataPusher(ctx context.Context, md pmetric.Metrics) error {
kafkaTopicPrefix, err := getKafkaTopicFromClientMetadata(client.FromContext(ctx).Metadata)
if err != nil {
return consumererror.NewPermanent(err)
}
kafkaTopicPrefix := getKafkaTopicPrefixFromClientMetadata(client.FromContext(ctx).Metadata)

kafkaTopic := fmt.Sprintf("%s_metrics", kafkaTopicPrefix)
messages, err := e.marshaler.Marshal(md, kafkaTopic)
if err != nil {
Expand Down Expand Up @@ -111,10 +107,8 @@ type kafkaLogsProducer struct {
func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) error {
e.normalizeLogData(&ld)

kafkaTopicPrefix, err := getKafkaTopicFromClientMetadata(client.FromContext(ctx).Metadata)
if err != nil {
return consumererror.NewPermanent(err)
}
kafkaTopicPrefix := getKafkaTopicPrefixFromClientMetadata(client.FromContext(ctx).Metadata)

kafkaTopic := fmt.Sprintf("%s_logs", kafkaTopicPrefix)
messages, err := e.marshaler.Marshal(ld, kafkaTopic)
if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions exporter/signozkafkaexporter/utils.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package signozkafkaexporter

import (
"fmt"

"go.opentelemetry.io/collector/client"
)

const (
DefaultKafkaTopicPrefix = "default"
)

// getKafkaTopicFromClientMetadata returns the kafka topic from client metadata
func getKafkaTopicFromClientMetadata(md client.Metadata) (string, error) {
func getKafkaTopicPrefixFromClientMetadata(md client.Metadata) string {
// return default topic if no tenant id is found in client metadata
signozTenantId := md.Get("signoz_tenant_id")
if len(signozTenantId) == 0 {
return "", fmt.Errorf("signoz_tenant_id not found in client metadata")
if len(signozTenantId) != 0 {
return signozTenantId[0]
}
return signozTenantId[0], nil

return DefaultKafkaTopicPrefix
}

0 comments on commit 539b6be

Please sign in to comment.