Skip to content

Commit

Permalink
pkg/loop: use beholder
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Aug 16, 2024
1 parent 1392fd2 commit b73ec6b
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 76 deletions.
13 changes: 8 additions & 5 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,20 @@ func newTracerProvider(config Config, resource *sdkresource.Resource, creds cred
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter,
trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s

opts := []sdktrace.TracerProviderOption{
sdktrace.WithBatcher(exporter, trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s
sdktrace.WithResource(resource),
sdktrace.WithSampler(
sdktrace.ParentBased(
sdktrace.TraceIDRatioBased(config.TraceSampleRate),
),
),
)
return tp, nil
}
if config.TraceSpanExporter != nil {
opts = append(opts, sdktrace.WithBatcher(config.TraceSpanExporter))
}
return sdktrace.NewTracerProvider(opts...), nil
}

func newMeterProvider(config Config, resource *sdkresource.Resource, creds credentials.TransportCredentials) (*sdkmetric.MeterProvider, error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

otelattr "go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

type Config struct {
Expand All @@ -18,6 +19,7 @@ type Config struct {
// OTel Trace
TraceSampleRate float64
TraceBatchTimeout time.Duration
TraceSpanExporter sdktrace.SpanExporter
// OTel Metric
MetricReaderInterval time.Duration
// OTel Log
Expand Down
4 changes: 2 additions & 2 deletions pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

otelattr "go.opentelemetry.io/otel/attribute"

beholder "github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

const (
Expand Down Expand Up @@ -35,5 +35,5 @@ func ExampleConfig() {
}
fmt.Printf("%+v", config)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholdeclient slice:<nil>}}] EmitterExportTimeout:1s TraceSampleRate:1 TraceBatchTimeout:1s MetricReaderInterval:1s LogExportTimeout:1s}
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholdeclient slice:<nil>}}] EmitterExportTimeout:1s TraceSampleRate:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> MetricReaderInterval:1s LogExportTimeout:1s}
}
82 changes: 66 additions & 16 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ import (
)

const (
envDatabaseURL = "CL_DATABASE_URL"
envPromPort = "CL_PROMETHEUS_PORT"
envTracingEnabled = "CL_TRACING_ENABLED"
envTracingCollectorTarget = "CL_TRACING_COLLECTOR_TARGET"
envTracingSamplingRatio = "CL_TRACING_SAMPLING_RATIO"
envTracingAttribute = "CL_TRACING_ATTRIBUTE_"
envTracingTLSCertPath = "CL_TRACING_TLS_CERT_PATH"
envDatabaseURL = "CL_DATABASE_URL"
envPromPort = "CL_PROMETHEUS_PORT"
envTracingEnabled = "CL_TRACING_ENABLED"
envTracingCollectorTarget = "CL_TRACING_COLLECTOR_TARGET"
envTracingSamplingRatio = "CL_TRACING_SAMPLING_RATIO"
envTracingAttribute = "CL_TRACING_ATTRIBUTE_"
envTracingTLSCertPath = "CL_TRACING_TLS_CERT_PATH"
envBeholderEndpoint = "CL_BEHOLDER_ENDPOINT"
envInsecureConnection = "CL_BEHOLDER_INSECURE_CONNECTION"
envBeholderCACertFile = "CL_BEHOLDER_CA_CERT_FILE"
envBeholderAttributes = "CL_BEHOLDER_ATTRIBUTES"
envBeholderTraceSampleRatio = "CL_BEHOLDER_TRACE_SAMPLE_RATIO"
//TODO env vars
)

// EnvConfig is the configuration between the application and the LOOP executable. The values
Expand All @@ -32,6 +38,13 @@ type EnvConfig struct {
TracingSamplingRatio float64
TracingTLSCertPath string
TracingAttributes map[string]string

BeholderEndpoint string
BeholderInsecureConnection bool
BeholderCACertFile string
BeholderAttributes OtelAttributes
BeholderTraceSampleRatio float64
//TODO more beholder?
}

// AsCmdEnv returns a slice of environment variable key/value pairs for an exec.Cmd.
Expand Down Expand Up @@ -87,6 +100,17 @@ func (e *EnvConfig) parse() error {
e.TracingSamplingRatio = getTracingSamplingRatio()
e.TracingTLSCertPath = getTLSCertPath()
}

if e.BeholderEndpoint = os.Getenv(envBeholderEndpoint); e.BeholderEndpoint != "" {
e.BeholderInsecureConnection, err = getBeholderInsecureConnection()
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envBeholderEndpoint, err)
}
e.BeholderCACertFile = os.Getenv(envBeholderCACertFile)
e.BeholderAttributes = getAttributes(envBeholderAttributes)
e.BeholderTraceSampleRatio = getBeholderTracingSampleRatio()
//TODO other fields
}
return nil
}

Expand All @@ -104,11 +128,15 @@ func ManagedGRPCClientConfig(clientConfig *plugin.ClientConfig, c BrokerConfig)

// isTracingEnabled parses and validates the TRACING_ENABLED environment variable.
func getTracingEnabled() (bool, error) {
tracingEnabledString := os.Getenv(envTracingEnabled)
if tracingEnabledString == "" {
return getBool(envTracingEnabled)
}

func getBool(envKey string) (bool, error) {
s := os.Getenv(envKey)
if s == "" {
return false, nil
}
return strconv.ParseBool(tracingEnabledString)
return strconv.ParseBool(s)
}

// getValidCollectorTarget validates TRACING_COLLECTOR_TARGET as a URL.
Expand All @@ -123,10 +151,14 @@ func getValidCollectorTarget() (string, error) {

// getTracingAttributes collects attributes prefixed with TRACING_ATTRIBUTE_.
func getTracingAttributes() map[string]string {
return getAttributes(envTracingAttribute)
}

func getAttributes(envKey string) map[string]string {
tracingAttributes := make(map[string]string)
for _, env := range os.Environ() {
if strings.HasPrefix(env, envTracingAttribute) {
tracingAttributes[strings.TrimPrefix(env, envTracingAttribute)] = os.Getenv(env)
if strings.HasPrefix(env, envKey) {
tracingAttributes[strings.TrimPrefix(env, envKey)] = os.Getenv(env)
}
}
return tracingAttributes
Expand All @@ -135,15 +167,19 @@ func getTracingAttributes() map[string]string {
// getTracingSamplingRatio parses the TRACING_SAMPLING_RATIO environment variable.
// Any errors in parsing result in a sampling ratio of 0.0.
func getTracingSamplingRatio() float64 {
tracingSamplingRatio := os.Getenv(envTracingSamplingRatio)
if tracingSamplingRatio == "" {
return getFloat64(envTracingSamplingRatio)
}

func getFloat64(envKey string) float64 {
s := os.Getenv(envKey)
if s == "" {
return 0.0
}
samplingRatio, err := strconv.ParseFloat(tracingSamplingRatio, 64)
f, err := strconv.ParseFloat(s, 64)
if err != nil {
return 0.0
}
return samplingRatio
return f
}

// getTLSCertPath parses the CL_TRACING_TLS_CERT_PATH environment variable.
Expand All @@ -164,3 +200,17 @@ func getDatabaseURL() (*url.URL, error) {
}
return u, nil
}

// getBeholderInsecureConnection parses and validates the TRACING_ENABLED environment variable.
func getBeholderInsecureConnection() (bool, error) {
return getBool(envInsecureConnection)
}

// getBeholderAttributes collects attributes prefixed with TRACING_ATTRIBUTE_.
func getBeholderAttributes() map[string]string {
return getAttributes(envBeholderAttributes)
}

func getBeholderTracingSampleRatio() float64 {
return getFloat64(envBeholderTraceSampleRatio)
}
43 changes: 39 additions & 4 deletions pkg/loop/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)
Expand Down Expand Up @@ -43,6 +44,7 @@ func MustNewStartedServer(loggerName string) *Server {
type Server struct {
GRPCOpts GRPCOpts
Logger logger.SugaredLogger
Beholder beholder.OtelClient
promServer *PromServer
checker *services.HealthChecker
}
Expand All @@ -68,16 +70,49 @@ func (s *Server) start() error {
return fmt.Errorf("error getting environment configuration: %w", err)
}

if err := SetupTracing(TracingConfig{
tracingConfig := TracingConfig{
Enabled: envCfg.TracingEnabled,
CollectorTarget: envCfg.TracingCollectorTarget,
SamplingRatio: envCfg.TracingSamplingRatio,
TLSCertPath: envCfg.TracingTLSCertPath,
NodeAttributes: envCfg.TracingAttributes,
OnDialError: func(err error) { s.Logger.Errorw("Failed to dial", "err", err) },
}); err != nil {
// non blocking to server start
s.Logger.Errorf("Failed to setup tracing: %s", err)
}

if envCfg.BeholderEndpoint == "" {
s.Beholder = beholder.NewNoopClient()
err := SetupTracing(tracingConfig)
if err != nil {
// non blocking to server start
s.Logger.Errorf("Failed to setup tracing: %s", err)
}
} else {
beholderCfg := beholder.Config{
InsecureConnection: envCfg.BeholderInsecureConnection,
CACertFile: envCfg.BeholderCACertFile,
OtelExporterGRPCEndpoint: envCfg.BeholderEndpoint,
ResourceAttributes: append(tracingConfig.Attributes(), envCfg.BeholderAttributes.AsStringAttributes()...),
TraceSampleRate: envCfg.BeholderTraceSampleRatio,
}

if tracingConfig.Enabled {
exporter, err := tracingConfig.NewSpanExporter()
if err != nil {
// non blocking to server start
s.Logger.Errorf("Failed to setup tracing exporter: %s", err)
} else {
beholderCfg.TraceSpanExporter = exporter
}
}

var err error
s.Beholder, err = beholder.NewOtelClient(beholderCfg, func(err error) {
s.Logger.Errorw("Otel error", "err", err)
})
if err != nil {
//TODO should this be non-blocking too?
return fmt.Errorf("error creating beholder client: %w", err)
}
}

s.promServer = NewPromServer(envCfg.PrometheusPort, s.Logger)
Expand Down
Loading

0 comments on commit b73ec6b

Please sign in to comment.