From 583b5da49c24aa9a99897420165092074ee7e820 Mon Sep 17 00:00:00 2001 From: Polapragada Yashwant <155515534+thugrock7@users.noreply.github.com> Date: Wed, 6 Nov 2024 13:07:01 +0000 Subject: [PATCH] feat: use opaque key to identify trace-provider (#241) * opaque key to identify traceproviders * using semconv.ServiceNameKey * nit fix --- instrumentation/opentelemetry/init.go | 41 ++++++++++++------- .../opentelemetry/init_additional.go | 2 +- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/instrumentation/opentelemetry/init.go b/instrumentation/opentelemetry/init.go index c4b70f2..81f0c8b 100644 --- a/instrumentation/opentelemetry/init.go +++ b/instrumentation/opentelemetry/init.go @@ -316,7 +316,7 @@ func InitWithSpanProcessorWrapperAndZap(cfg *config.AgentConfig, wrapper SpanPro resources, err := resource.New( context.Background(), - resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, + resource.WithAttributes(createResources(getResourceAttrsWithServiceName(cfg.ResourceAttributes, cfg.GetServiceName().GetValue()), versionInfoAttrs)...), ) if err != nil { @@ -351,12 +351,12 @@ func InitWithSpanProcessorWrapperAndZap(cfg *config.AgentConfig, wrapper SpanPro return func() { mu.Lock() defer mu.Unlock() - for serviceName, tracerProvider := range traceProviders { + for key, tracerProvider := range traceProviders { err := tracerProvider.Shutdown(context.Background()) if err != nil { log.Printf("error while shutting down tracer provider: %v\n", err) } - delete(traceProviders, serviceName) + delete(traceProviders, key) } traceProviders = map[string]*sdktrace.TracerProvider{} err := tp.Shutdown(context.Background()) @@ -371,10 +371,9 @@ func InitWithSpanProcessorWrapperAndZap(cfg *config.AgentConfig, wrapper SpanPro } } -func createResources(serviceName string, resources map[string]string, +func createResources(resources map[string]string, versionInfo []attribute.KeyValue) []attribute.KeyValue { retValues := []attribute.KeyValue{ - semconv.ServiceNameKey.String(serviceName), semconv.TelemetrySDKLanguageGo, } @@ -386,14 +385,14 @@ func createResources(serviceName string, resources map[string]string, return retValues } -// RegisterService creates tracerprovider for a new service and returns a func which can be used to create spans and the TracerProvider -func RegisterService(serviceName string, resourceAttributes map[string]string) (sdk.StartSpan, trace.TracerProvider, error) { - return RegisterServiceWithSpanProcessorWrapper(serviceName, resourceAttributes, nil, versionInfoAttributes) +// RegisterService creates tracerprovider for a new service (represented via a unique key) and returns a func which can be used to create spans and the TracerProvider +func RegisterService(key string, resourceAttributes map[string]string) (sdk.StartSpan, trace.TracerProvider, error) { + return RegisterServiceWithSpanProcessorWrapper(key, resourceAttributes, nil, versionInfoAttributes) } -// RegisterServiceWithSpanProcessorWrapper creates a tracerprovider for a new service with a wrapper over opentelemetry span processor +// RegisterServiceWithSpanProcessorWrapper creates a tracerprovider for a new service (represented via a unique key) with a wrapper over opentelemetry span processor // and returns a func which can be used to create spans and the TracerProvider -func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttributes map[string]string, +func RegisterServiceWithSpanProcessorWrapper(key string, resourceAttributes map[string]string, wrapper SpanProcessorWrapper, versionInfoAttrs []attribute.KeyValue) (sdk.StartSpan, trace.TracerProvider, error) { mu.Lock() defer mu.Unlock() @@ -405,8 +404,8 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu return NoopStartSpan, noop.NewTracerProvider(), nil } - if _, ok := traceProviders[serviceName]; ok { - return nil, noop.NewTracerProvider(), fmt.Errorf("service %v already initialized", serviceName) + if _, ok := traceProviders[key]; ok { + return nil, noop.NewTracerProvider(), fmt.Errorf("key %v is already used for initialization", key) } exporter, err := exporterFactory() @@ -424,7 +423,7 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu resources, err := resource.New( context.Background(), - resource.WithAttributes(createResources(serviceName, resourceAttributes, versionInfoAttrs)...), + resource.WithAttributes(createResources(resourceAttributes, versionInfoAttrs)...), ) if err != nil { log.Fatal(err) @@ -435,7 +434,7 @@ func RegisterServiceWithSpanProcessorWrapper(serviceName string, resourceAttribu sdktrace.WithResource(resources), ) - traceProviders[serviceName] = tp + traceProviders[key] = tp return startSpan(func() trace.TracerProvider { return tp }), tp, nil @@ -453,7 +452,7 @@ func initializeMetrics(cfg *config.AgentConfig, versionInfoAttrs []attribute.Key } periodicReader := metric.NewPeriodicReader(metricsExporter) - resourceKvps := createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, versionInfoAttrs) + resourceKvps := createResources(getResourceAttrsWithServiceName(cfg.ResourceAttributes, cfg.GetServiceName().GetValue()), versionInfoAttrs) resourceKvps = append(resourceKvps, identifier.ServiceInstanceKeyValue) metricResources, err := resource.New(context.Background(), resource.WithAttributes(resourceKvps...)) if err != nil { @@ -492,6 +491,18 @@ func shouldUseCustomBatchSpanProcessor(cfg *config.AgentConfig) bool { (cfg.GetTelemetry() != nil && cfg.GetTelemetry().GetMetricsEnabled().GetValue()) // metrics enabled } +func getResourceAttrsWithServiceName(resourceMap map[string]string, serviceName string) map[string]string { + if resourceMap == nil { + resourceMap = make(map[string]string) + } + serviceNameKey := string(semconv.ServiceNameKey) + if _, ok := resourceMap[serviceNameKey]; !ok && (len(serviceName) > 0) { + resourceMap[serviceNameKey] = serviceName + } + + return resourceMap +} + // SpanProcessorWrapper wraps otel span processor // and is responsible to delegate calls to the wrapped processor type SpanProcessorWrapper interface { diff --git a/instrumentation/opentelemetry/init_additional.go b/instrumentation/opentelemetry/init_additional.go index f932768..39d1cd1 100644 --- a/instrumentation/opentelemetry/init_additional.go +++ b/instrumentation/opentelemetry/init_additional.go @@ -35,7 +35,7 @@ func InitAsAdditional(cfg *config.AgentConfig) (trace.SpanProcessor, func()) { if cfg.GetServiceName().GetValue() != "" { resource, err := resource.New( context.Background(), - resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes, versionInfoAttributes)...), + resource.WithAttributes(createResources(getResourceAttrsWithServiceName(cfg.ResourceAttributes, cfg.GetServiceName().GetValue()), versionInfoAttributes)...), ) if err != nil { log.Fatal(err)