diff --git a/cmd/server/main.go b/cmd/server/main.go index 4608ba032..645e91be7 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.20.0" @@ -58,6 +59,11 @@ import ( "github.com/openmeterio/openmeter/pkg/slicesx" ) +const ( + defaultShutdownTimeout = 5 * time.Second + otelName = "openmeter.io/backend" +) + func main() { v, flags := viper.New(), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError) ctx := context.Background() @@ -121,35 +127,47 @@ func main() { telemetryRouter := chi.NewRouter() telemetryRouter.Mount("/debug", middleware.Profiler()) - meterProvider, err := conf.Telemetry.Metrics.NewMeterProvider(context.Background(), res) + // Initialize OTel Metrics + otelMeterProvider, err := conf.Telemetry.Metrics.NewMeterProvider(ctx, res) if err != nil { - logger.Error(err.Error()) + logger.Error("failed to initialize OpenTelemetry Metrics provider", slog.String("error", err.Error())) os.Exit(1) } defer func() { - if err := meterProvider.Shutdown(context.Background()); err != nil { + // Use dedicated context with timeout for shutdown as parent context might be canceled + // by the time the execution reaches this stage. + ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) + defer cancel() + + if err := otelMeterProvider.Shutdown(ctx); err != nil { logger.Error("shutting down meter provider: %v", err) } }() - - otel.SetMeterProvider(meterProvider) + otel.SetMeterProvider(otelMeterProvider) + metricMeter := otelMeterProvider.Meter(otelName) if conf.Telemetry.Metrics.Exporters.Prometheus.Enabled { telemetryRouter.Handle("/metrics", promhttp.Handler()) } - tracerProvider, err := conf.Telemetry.Trace.NewTracerProvider(context.Background(), res) + // Initialize OTel Tracer + otelTracerProvider, err := conf.Telemetry.Trace.NewTracerProvider(ctx, res) if err != nil { - logger.Error(err.Error()) + logger.Error("failed to initialize OpenTelemetry Trace provider", slog.String("error", err.Error())) os.Exit(1) } defer func() { - if err := tracerProvider.Shutdown(context.Background()); err != nil { + // Use dedicated context with timeout for shutdown as parent context might be canceled + // by the time the execution reaches this stage. + ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) + defer cancel() + + if err := otelTracerProvider.Shutdown(ctx); err != nil { logger.Error("shutting down tracer provider", "error", err) } }() - otel.SetTracerProvider(tracerProvider) + otel.SetTracerProvider(otelTracerProvider) otel.SetTextMapPropagator(propagation.TraceContext{}) // Configure health checker @@ -184,7 +202,14 @@ func main() { } // Initialize Kafka Ingest - ingestCollector, kafkaIngestNamespaceHandler, err := initKafkaIngest(ctx, conf, logger, serializer.NewJSONSerializer(), &group) + ingestCollector, kafkaIngestNamespaceHandler, err := initKafkaIngest( + ctx, + conf, + logger, + metricMeter, + serializer.NewJSONSerializer(), + &group, + ) if err != nil { logger.Error("failed to initialize kafka ingest", "error", err) os.Exit(1) @@ -325,8 +350,8 @@ func main() { } }), "", - otelhttp.WithMeterProvider(meterProvider), - otelhttp.WithTracerProvider(tracerProvider), + otelhttp.WithMeterProvider(otelMeterProvider), + otelhttp.WithTracerProvider(otelTracerProvider), ) }) }, @@ -392,7 +417,7 @@ func main() { } } -func initKafkaIngest(ctx context.Context, config config.Configuration, logger *slog.Logger, serializer serializer.Serializer, group *run.Group) (*kafkaingest.Collector, *kafkaingest.NamespaceHandler, error) { +func initKafkaIngest(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, serializer serializer.Serializer, group *run.Group) (*kafkaingest.Collector, *kafkaingest.NamespaceHandler, error) { // Initialize Kafka Admin Client kafkaConfig := config.Ingest.Kafka.CreateKafkaConfig() @@ -409,10 +434,14 @@ func initKafkaIngest(ctx context.Context, config config.Configuration, logger *s slog.Debug("connected to Kafka") - collector := &kafkaingest.Collector{ - Producer: producer, - NamespacedTopicTemplate: config.Ingest.Kafka.EventsTopicTemplate, - Serializer: serializer, + collector, err := kafkaingest.NewCollector( + producer, + serializer, + config.Ingest.Kafka.EventsTopicTemplate, + metricMeter, + ) + if err != nil { + return nil, nil, fmt.Errorf("init kafka ingest: %w", err) } kafkaAdminClient, err := kafka.NewAdminClientFromProducer(producer) diff --git a/collector/benthos/internal/message/transaction.go b/collector/benthos/internal/message/transaction.go index 86196eeb1..8e69b96c3 100644 --- a/collector/benthos/internal/message/transaction.go +++ b/collector/benthos/internal/message/transaction.go @@ -31,7 +31,7 @@ type Transaction struct { // indicates whether the message has been propagated successfully. responseFunc func(context.Context, error) error - // Used for cancelling transactions. When cancelled it is up to the receiver + // Used for canceling transactions. When canceled it is up to the receiver // of this transaction to abort any attempt to deliver the transaction // message. ctx context.Context @@ -60,7 +60,7 @@ func NewTransactionFunc(payload Batch, fn func(context.Context, error) error) Tr } // Context returns a context that indicates the cancellation of a transaction. -// It is optional for receivers of a transaction to honour this context, and is +// It is optional for receivers of a transaction to honor this context, and is // worth doing in cases where the transaction is blocked (on reconnect loops, // etc) as it is often used as a fail-fast mechanism. // @@ -71,7 +71,7 @@ func (t *Transaction) Context() context.Context { } // WithContext returns a copy of the transaction associated with a context used -// for cancellation. When cancelled it is up to the receiver of this transaction +// for cancellation. When canceled it is up to the receiver of this transaction // to abort any attempt to deliver the transaction message. func (t *Transaction) WithContext(ctx context.Context) *Transaction { newT := *t diff --git a/collector/benthos/internal/shutdown/signaler.go b/collector/benthos/internal/shutdown/signaler.go index 9a49d1dff..c95510b38 100644 --- a/collector/benthos/internal/shutdown/signaler.go +++ b/collector/benthos/internal/shutdown/signaler.go @@ -86,7 +86,7 @@ func (s *Signaller) CloseAtLeisureChan() <-chan struct{} { } // CloseAtLeisureCtx returns a context.Context that will be terminated when -// either the provided context is cancelled or the signal to shut down +// either the provided context is canceled or the signal to shut down // either at leisure or immediately has been made. func (s *Signaller) CloseAtLeisureCtx(ctx context.Context) (context.Context, context.CancelFunc) { var cancel context.CancelFunc @@ -119,7 +119,7 @@ func (s *Signaller) CloseNowChan() <-chan struct{} { } // CloseNowCtx returns a context.Context that will be terminated when either the -// provided context is cancelled or the signal to shut down immediately has been +// provided context is canceled or the signal to shut down immediately has been // made. func (s *Signaller) CloseNowCtx(ctx context.Context) (context.Context, context.CancelFunc) { var cancel context.CancelFunc @@ -151,8 +151,8 @@ func (s *Signaller) HasClosedChan() <-chan struct{} { return s.hasClosedChan } -// HasClosedCtx returns a context.Context that will be cancelled when either the -// provided context is cancelled or the signal that the component has shut down +// HasClosedCtx returns a context.Context that will be canceled when either the +// provided context is canceled or the signal that the component has shut down // has been made. func (s *Signaller) HasClosedCtx(ctx context.Context) (context.Context, context.CancelFunc) { var cancel context.CancelFunc diff --git a/internal/ingest/kafkaingest/collector.go b/internal/ingest/kafkaingest/collector.go index 25713cdb2..0cec88399 100644 --- a/internal/ingest/kafkaingest/collector.go +++ b/internal/ingest/kafkaingest/collector.go @@ -7,6 +7,8 @@ import ( "github.com/cloudevents/sdk-go/v2/event" "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/internal/ingest/kafkaingest/serializer" ) @@ -19,10 +21,49 @@ type Collector struct { // NamespacedTopicTemplate needs to contain at least one string parameter passed to fmt.Sprintf. // For example: "om_%s_events" NamespacedTopicTemplate string + + ingestEventCounter metric.Int64Counter +} + +func NewCollector( + producer *kafka.Producer, + serializer serializer.Serializer, + namespacedTopicTemplate string, + metricMeter metric.Meter, +) (*Collector, error) { + if producer == nil { + return nil, fmt.Errorf("producer is required") + } + if serializer == nil { + return nil, fmt.Errorf("serializer is required") + } + if namespacedTopicTemplate == "" { + return nil, fmt.Errorf("namespaced topic template is required") + } + if metricMeter == nil { + return nil, fmt.Errorf("metric meter is required") + } + + // Initialize OTel metrics + ingestEventCounter, err := metricMeter.Int64Counter( + "ingest.events", + metric.WithDescription("The number of events ingested"), + metric.WithUnit("{event}"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create events counter: %w", err) + } + + return &Collector{ + Producer: producer, + Serializer: serializer, + NamespacedTopicTemplate: namespacedTopicTemplate, + ingestEventCounter: ingestEventCounter, + }, nil } // Ingest produces an event to a Kafka topic. -func (s Collector) Ingest(_ context.Context, namespace string, ev event.Event) error { +func (s Collector) Ingest(ctx context.Context, namespace string, ev event.Event) error { topic := fmt.Sprintf(s.NamespacedTopicTemplate, namespace) key, err := s.Serializer.SerializeKey(topic, ev) if err != nil { @@ -50,6 +91,10 @@ func (s Collector) Ingest(_ context.Context, namespace string, ev event.Event) e return fmt.Errorf("producing kafka message: %w", err) } + // Increment the ingest event counter metric + namespaceAttr := attribute.String("namespace", namespace) + s.ingestEventCounter.Add(ctx, 1, metric.WithAttributes(namespaceAttr)) + return nil }