Skip to content

Commit

Permalink
pkg/loop: use beholder
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Aug 13, 2024
1 parent de71480 commit 34df236
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 56 deletions.
12 changes: 6 additions & 6 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package beholder
import (
"context"
"errors"
"slices"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -256,17 +257,16 @@ func newTracerProvider(config Config, resource *sdkresource.Resource, creds cred
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter,
trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s
opts := slices.Clone(config.TracerProviderOpts)
opts = append(opts, sdktrace.WithBatcher(exporter,
trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s
sdktrace.WithResource(resource),
sdktrace.WithSampler(
sdktrace.ParentBased(
sdktrace.TraceIDRatioBased(config.TraceSampleRate),
),
),
)
return tp, nil
))
return sdktrace.NewTracerProvider(opts...), nil
}

func newMeterProvider(config Config, resource *sdkresource.Resource, creds credentials.TransportCredentials) (*sdkmetric.MeterProvider, error) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

otelattr "go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

type Config struct {
Expand All @@ -17,8 +18,9 @@ type Config struct {
// Message Emitter
EmitterExportTimeout time.Duration
// OTel Trace
TraceSampleRate float64
TraceBatchTimeout time.Duration
TraceSampleRate float64
TraceBatchTimeout time.Duration
TracerProviderOpts []sdktrace.TracerProviderOption
// OTel Metric
MetricReaderInterval time.Duration
// OTel Log
Expand Down
4 changes: 4 additions & 0 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
envTracingSamplingRatio = "CL_TRACING_SAMPLING_RATIO"
envTracingAttribute = "CL_TRACING_ATTRIBUTE_"
envTracingTLSCertPath = "CL_TRACING_TLS_CERT_PATH"
envBeholderEndpoint = "CL_BEHOLDER_ENDPOINT"
)

// EnvConfig is the configuration between the application and the LOOP executable. The values
Expand All @@ -32,6 +33,9 @@ type EnvConfig struct {
TracingSamplingRatio float64
TracingTLSCertPath string
TracingAttributes map[string]string

BeholderEndpoint string
//TODO BeholderConfig
}

// AsCmdEnv returns a slice of environment variable key/value pairs for an exec.Cmd.
Expand Down
49 changes: 45 additions & 4 deletions pkg/loop/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"fmt"
"os"

sdktrace "go.opentelemetry.io/otel/sdk/trace"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)
Expand Down Expand Up @@ -43,6 +46,7 @@ func MustNewStartedServer(loggerName string) *Server {
type Server struct {
GRPCOpts GRPCOpts
Logger logger.SugaredLogger
Beholder beholder.OtelClient
promServer *PromServer
checker *services.HealthChecker
}
Expand All @@ -68,16 +72,53 @@ func (s *Server) start() error {
return fmt.Errorf("error getting environment configuration: %w", err)
}

if err := SetupTracing(TracingConfig{
tracingConfig := TracingConfig{
Enabled: envCfg.TracingEnabled,
CollectorTarget: envCfg.TracingCollectorTarget,
SamplingRatio: envCfg.TracingSamplingRatio,
TLSCertPath: envCfg.TracingTLSCertPath,
NodeAttributes: envCfg.TracingAttributes,
OnDialError: func(err error) { s.Logger.Errorw("Failed to dial", "err", err) },
}); err != nil {
// non blocking to server start
s.Logger.Errorf("Failed to setup tracing: %s", err)
}

if envCfg.BeholderEndpoint == "" {
err := SetupTracing(tracingConfig)
if err != nil {
// non blocking to server start
s.Logger.Errorf("Failed to setup tracing: %s", err)
}
} else {
beholderCfg := beholder.Config{
//TODO set these from envCfg
InsecureConnection: false,
CACertFile: "",
OtelExporterGRPCEndpoint: "",
PackageName: "",
ResourceAttributes: tracingConfig.Attributes(), //TODO merge with beholder config

Check failure on line 97 in pkg/loop/server.go

View workflow job for this annotation

GitHub Actions / build-test

cannot use tracingConfig.Attributes() (value of type []"go.opentelemetry.io/otel/attribute".KeyValue) as map[string]string value in struct literal
EmitterExportTimeout: 0,
TraceSampleRate: 0,
TraceBatchTimeout: 0,
MetricReaderInterval: 0,
LogExportTimeout: 0,
}

if tracingConfig.Enabled {
exporter, err := tracingConfig.NewSpanExporter()
if err != nil {
// non blocking to server start
s.Logger.Errorf("Failed to setup tracing: %s", err)
}
beholderCfg.TracerProviderOpts = append(beholderCfg.TracerProviderOpts, sdktrace.WithBatcher(exporter))
}

var err error
s.Beholder, err = beholder.NewOtelClient(beholderCfg, func(err error) {
s.Logger.Errorw("Otel error", "err", err)
})
if err != nil {
//TODO should this be non-blocking too?
return fmt.Errorf("error creating beholder client: %w", err)
}
}

s.promServer = NewPromServer(envCfg.PrometheusPort, s.Logger)
Expand Down
100 changes: 56 additions & 44 deletions pkg/loop/telem.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,69 @@ func NewGRPCOpts(registerer prometheus.Registerer) GRPCOpts {

// SetupTracing initializes open telemetry with the provided config.
// It sets the global trace provider and opens a connection to the configured collector.
func SetupTracing(config TracingConfig) (err error) {
func SetupTracing(config TracingConfig) error {
if !config.Enabled {
return nil
}

traceExporter, err := config.NewSpanExporter()
if err != nil {
return err
}

resource := resource.NewWithAttributes(
semconv.SchemaURL,
config.Attributes()...,
)

tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(traceExporter),
sdktrace.WithResource(resource),
sdktrace.WithSampler(
sdktrace.ParentBased(
sdktrace.TraceIDRatioBased(config.SamplingRatio),
),
),
)

otel.SetTracerProvider(tracerProvider)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return nil
}

func (config TracingConfig) Attributes() []attribute.KeyValue {
var version string
var service string
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
version = "unknown"
service = "cl-node"
} else {
version = buildInfo.Main.Version
service = buildInfo.Main.Path
}

attributes := []attribute.KeyValue{
semconv.ServiceNameKey.String(service),
semconv.ProcessPIDKey.Int(os.Getpid()),
semconv.ServiceVersionKey.String(version),
}

for k, v := range config.NodeAttributes {
attributes = append(attributes, attribute.String(k, v))
}
return attributes
}

func (config TracingConfig) NewSpanExporter() (sdktrace.SpanExporter, error) {
ctx := context.Background()

var creds credentials.TransportCredentials
if config.TLSCertPath != "" {
var err error
creds, err = credentials.NewClientTLSFromFile(config.TLSCertPath, "")
if err != nil {
return err
return nil, err
}
} else {
creds = insecure.NewCredentials()
Expand All @@ -87,53 +138,14 @@ func SetupTracing(config TracingConfig) (err error) {
return conn, err2
}))
if err != nil {
return err
return nil, err
}

traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return err
}

var version string
var service string
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
version = "unknown"
service = "cl-node"
} else {
version = buildInfo.Main.Version
service = buildInfo.Main.Path
return nil, err
}

attributes := []attribute.KeyValue{
semconv.ServiceNameKey.String(service),
semconv.ProcessPIDKey.Int(os.Getpid()),
semconv.ServiceVersionKey.String(version),
}

for k, v := range config.NodeAttributes {
attributes = append(attributes, attribute.String(k, v))
}

resource := resource.NewWithAttributes(
semconv.SchemaURL,
attributes...,
)

tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(traceExporter),
sdktrace.WithResource(resource),
sdktrace.WithSampler(
sdktrace.ParentBased(
sdktrace.TraceIDRatioBased(config.SamplingRatio),
),
),
)

otel.SetTracerProvider(tracerProvider)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return nil
return traceExporter, nil
}

var grpcpromBuckets = []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}
Expand Down

0 comments on commit 34df236

Please sign in to comment.