Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/loop: use beholder #696

Merged
merged 2 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,18 @@ type Client struct {

// NewClient creates a new Client with initialized OpenTelemetry components
// To handle OpenTelemetry errors use [otel.SetErrorHandler](https://pkg.go.dev/go.opentelemetry.io/otel#SetErrorHandler)
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
factory := func(ctx context.Context, options ...otlploggrpc.Option) (sdklog.Exporter, error) {
return otlploggrpc.New(ctx, options...)
func NewClient(cfg Config) (*Client, error) {
factory := func(options ...otlploggrpc.Option) (sdklog.Exporter, error) {
// note: context is unused internally
return otlploggrpc.New(context.Background(), options...) //nolint
}
return newClient(ctx, cfg, factory)
return newClient(cfg, factory)
}

// Used for testing to override the default exporter
type otlploggrpcFactory func(ctx context.Context, options ...otlploggrpc.Option) (sdklog.Exporter, error)
type otlploggrpcFactory func(options ...otlploggrpc.Option) (sdklog.Exporter, error)

func newClient(ctx context.Context, cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, error) {
func newClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, error) {
baseResource, err := newOtelResource(cfg)
noop := NewNoopClient()
if err != nil {
Expand All @@ -76,7 +77,6 @@ func newClient(ctx context.Context, cfg Config, otlploggrpcNew otlploggrpcFactor
}
}
sharedLogExporter, err := otlploggrpcNew(
ctx,
otlploggrpc.WithTLSCredentials(creds),
otlploggrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint),
)
Expand Down Expand Up @@ -162,9 +162,7 @@ func newClient(ctx context.Context, cfg Config, otlploggrpcNew otlploggrpcFactor
}
return
}
client := Client{cfg, logger, tracer, meter, emitter, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}

return &client, nil
return &Client{cfg, logger, tracer, meter, emitter, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
}

// Closes all providers, flushes all data and stops all background processes
Expand Down Expand Up @@ -256,17 +254,20 @@ func newTracerProvider(config Config, resource *sdkresource.Resource, creds cred
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter,
trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s

opts := []sdktrace.TracerProviderOption{
sdktrace.WithBatcher(exporter, trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s
sdktrace.WithResource(resource),
sdktrace.WithSampler(
sdktrace.ParentBased(
sdktrace.TraceIDRatioBased(config.TraceSampleRatio),
),
),
)
return tp, nil
}
if config.TraceSpanExporter != nil {
opts = append(opts, sdktrace.WithBatcher(config.TraceSpanExporter))
Comment on lines +267 to +268
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkcll @patrickhuie19 Does this optional config for an additional exporter make sense? Or should we consider other alternatives?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing the configured exporter is more of a dependency injection rather than part of the config. Passing an exporter as part of the config seems like mixing different types of code aspects.

Copy link
Contributor

@pkcll pkcll Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see 4 options:

  • loop.SetupTracing calling beholder.newTraceProvider
  • beholder.newTraceProvider calling loop.SetupTracing
  • loop.SetupTracing and beholder.newTraceProvider calling some "common denominator" function e.g telemetry.NewTraceProvider
  • both loop.SetupTracing and beholder.newTraceProvider using shared configuration

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the TraceSpanExporter do on top of everything else configured in opts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one approach to bridging the two tracing systems.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It adds an additional exporter. It does not replace the normal one. They are accumulated: https://github.com/open-telemetry/opentelemetry-go/blob/main/sdk/trace/provider.go#L338

Copy link
Contributor

@pkcll pkcll Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.
Do you see use case for having two separate trace processors/exporters for loop/server otel instrumentation and beholder Tracer (which will be used for producing custom traces from code)?

This kind of setup makes sense when you have two separate targets for otel-collector which is not the case with chainlink nodes.
This will result in duplicated traces being sent to the same collector if the target is the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a check for duplicate endpoints. But otherwise using both at the same time is not inherently problematic, and having the flexibility will make it easier to transition as well as support non-production cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having the flexibility will make it easier to transition

Makes sense

}
return sdktrace.NewTracerProvider(opts...), nil
}

func newMeterProvider(config Config, resource *sdkresource.Resource, creds credentials.TransportCredentials) (*sdkmetric.MeterProvider, error) {
Expand Down
18 changes: 9 additions & 9 deletions pkg/beholder/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,15 @@ func TestClient(t *testing.T) {
exporterMock := mocks.NewOTLPExporter(t)
defer exporterMock.AssertExpectations(t)

otelErrorHandler := func(err error) {
t.Fatalf("otel error: %v", err)
}
// Override exporter factory which is used by Client
exporterFactory := func(context.Context, ...otlploggrpc.Option) (sdklog.Exporter, error) {
exporterFactory := func(...otlploggrpc.Option) (sdklog.Exporter, error) {
return exporterMock, nil
}
client, err := newClient(tests.Context(t), TestDefaultConfig(), exporterFactory)
client, err := newClient(TestDefaultConfig(), exporterFactory)
if err != nil {
t.Fatalf("Error creating beholder client: %v", err)
}
otel.SetErrorHandler(otel.ErrorHandlerFunc(otelErrorHandler))
otel.SetErrorHandler(otelMustNotErr(t))
// Number of exported messages
exportedMessageCount := 0

Expand Down Expand Up @@ -142,14 +139,13 @@ func TestClient(t *testing.T) {
func TestEmitterMessageValidation(t *testing.T) {
getEmitter := func(exporterMock *mocks.OTLPExporter) Emitter {
client, err := newClient(
tests.Context(t),
TestDefaultConfig(),
// Override exporter factory which is used by Client
func(context.Context, ...otlploggrpc.Option) (sdklog.Exporter, error) {
func(...otlploggrpc.Option) (sdklog.Exporter, error) {
return exporterMock, nil
},
)
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { t.Fatalf("otel error: %v", err) }))
otel.SetErrorHandler(otelMustNotErr(t))
assert.NoError(t, err)
return client.Emitter
}
Expand Down Expand Up @@ -252,3 +248,7 @@ func TestClient_ForPackage(t *testing.T) {
assert.Contains(t, b.String(), `"Name":"TestClient_ForPackage"`)
assert.Contains(t, b.String(), "testMetric")
}

func otelMustNotErr(t *testing.T) otel.ErrorHandlerFunc {
return func(err error) { t.Fatalf("otel error: %v", err) }
}
2 changes: 2 additions & 0 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

otelattr "go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

type Config struct {
Expand All @@ -21,6 +22,7 @@ type Config struct {
// OTel Trace
TraceSampleRatio float64
TraceBatchTimeout time.Duration
TraceSpanExporter sdktrace.SpanExporter // optional additional exporter
// OTel Metric
MetricReaderInterval time.Duration
// OTel Log
Expand Down
4 changes: 2 additions & 2 deletions pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

otelattr "go.opentelemetry.io/otel/attribute"

beholder "github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

const (
Expand Down Expand Up @@ -37,5 +37,5 @@ func ExampleConfig() {
}
fmt.Printf("%+v", config)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s MetricReaderInterval:1s LogExportTimeout:1s LogBatchProcessor:true}
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> MetricReaderInterval:1s LogExportTimeout:1s LogBatchProcessor:true}
}
19 changes: 7 additions & 12 deletions pkg/beholder/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@ import (
)

func ExampleNewClient() {
ctx := context.Background()

config := beholder.DefaultConfig()

// Initialize beholder otel client which sets up OTel components
client, err := beholder.NewClient(ctx, config)
client, err := beholder.NewClient(config)
if err != nil {
log.Fatalf("Error creating Beholder client: %v", err)
}
// Handle OTel errors
otel.SetErrorHandler(otel.ErrorHandlerFunc(errorHandler))
otel.SetErrorHandler(otelErrPrinter)
// Set global client so it will be accessible from anywhere through beholder functions
beholder.SetClient(client)

Expand Down Expand Up @@ -64,13 +62,12 @@ func ExampleTracer() {
config := beholder.DefaultConfig()

// Initialize beholder otel client which sets up OTel components
client, err := beholder.NewClient(ctx, config)
client, err := beholder.NewClient(config)
if err != nil {
log.Fatalf("Error creating Beholder client: %v", err)
}
// Handle OTel errors
otel.SetErrorHandler(otel.ErrorHandlerFunc(errorHandler))

otel.SetErrorHandler(otelErrPrinter)
// Set global client so it will be accessible from anywhere through beholder functions
beholder.SetClient(client)

Expand Down Expand Up @@ -117,8 +114,6 @@ func ExampleNewNoopClient() {
// Emitting custom message via noop otel client
}

func errorHandler(e error) {
if e != nil {
log.Printf("otel error: %v", e)
}
}
var otelErrPrinter = otel.ErrorHandlerFunc(func(err error) {
log.Printf("otel error: %v", err)
})
11 changes: 3 additions & 8 deletions pkg/beholder/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
)

// Pointer to the global Beholder Client
var globalClient = defaultClient()
var globalClient atomic.Pointer[Client]
Copy link
Contributor

@patrickhuie19 patrickhuie19 Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to sync access within the beholder pkg? What consumers interact with globalClient? I do see the diff with defaultClient below.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an exported SetClient function.


func init() { globalClient.Store(NewNoopClient()) }

// SetClient sets the global Beholder Client
func SetClient(client *Client) {
Expand Down Expand Up @@ -41,13 +43,6 @@ func GetEmitter() Emitter {
return GetClient().Emitter
}

func defaultClient() *atomic.Pointer[Client] {
ptr := &atomic.Pointer[Client]{}
client := NewNoopClient()
ptr.Store(client)
return ptr
}

// Sets global OTel logger, tracer, meter providers from Client.
// Makes them accessible from anywhere in the code via global otel getters.
// Any package that relies on go.opentelemetry.io will be able to pick up configured global providers
Expand Down
13 changes: 2 additions & 11 deletions pkg/beholder/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package beholder
import (
"context"
"errors"
"fmt"
"io"
"os"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
Expand Down Expand Up @@ -37,9 +35,7 @@ func NewNoopClient() *Client {
// MessageEmitter
messageEmitter := noopMessageEmitter{}

client := Client{cfg, logger, tracer, meter, messageEmitter, loggerProvider, tracerProvider, meterProvider, loggerProvider, noopOnClose}

return &client
return &Client{cfg, logger, tracer, meter, messageEmitter, loggerProvider, tracerProvider, meterProvider, loggerProvider, noopOnClose}
}

// NewStdoutClient creates a new Client with exporters which send telemetry data to standard output
Expand All @@ -62,9 +58,6 @@ func NewWriterClient(w io.Writer) (*Client, error) {
}
loggerProvider := sdklog.NewLoggerProvider(sdklog.WithProcessor(sdklog.NewSimpleProcessor(loggerExporter)))
logger := loggerProvider.Logger(defaultPackageName)
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
fmt.Printf("OTel error %s", err)
}))

// Tracer
traceExporter, err := stdouttrace.New(cfg.TraceOptions...)
Expand Down Expand Up @@ -101,9 +94,7 @@ func NewWriterClient(w io.Writer) (*Client, error) {
return
}

client := Client{cfg.Config, logger, tracer, meter, emitter, loggerProvider, tracerProvider, meterProvider, loggerProvider, onClose}

return &client, nil
return &Client{Config: cfg.Config, Logger: logger, Tracer: tracer, Meter: meter, Emitter: emitter, LoggerProvider: loggerProvider, TracerProvider: tracerProvider, MeterProvider: meterProvider, MessageLoggerProvider: loggerProvider, OnClose: onClose}, nil
}

type noopMessageEmitter struct{}
Expand Down
Loading
Loading