diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 966fa1a0ff8..829ed9d55ce 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -51,6 +51,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/services/versioning" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/sessions" "github.com/smartcontractkit/chainlink/v2/core/static" "github.com/smartcontractkit/chainlink/v2/core/store/migrate" @@ -109,6 +110,10 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme AuthPublicKeyHex: csaPubKeyHex, AuthHeaders: beholderAuthHeaders, } + // note: due to the OTEL specification, all histogram buckets + // must be defined when the beholder client is created + clientCfg.MetricViews = append(clientCfg.MetricViews, workflows.MetricViews()...) + if tracingCfg.Enabled { clientCfg.TraceSpanExporter, err = tracingCfg.NewSpanExporter() if err != nil { diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index ecb3ce60510..0ac26ad8de4 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -142,11 +142,7 @@ func (e *Engine) Start(_ context.Context) error { // create a new context, since the one passed in via Start is short-lived. ctx, _ := e.stopCh.NewCtx() - // spin up monitoring resources - err := initMonitoringResources() - if err != nil { - return fmt.Errorf("could not initialize monitoring resources: %w", err) - } + e.metrics.incrementWorkflowInitializationCounter(ctx) e.wg.Add(e.maxWorkerLimit) for i := 0; i < e.maxWorkerLimit; i++ { @@ -358,6 +354,7 @@ func (e *Engine) init(ctx context.Context) { e.logger.Info("engine initialized") logCustMsg(ctx, e.cma, "workflow registered", e.logger) + e.metrics.incrementWorkflowRegisteredCounter(ctx) e.afterInit(true) } @@ -439,7 +436,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig Metadata: capabilities.RequestMetadata{ WorkflowID: e.workflow.id, WorkflowOwner: e.workflow.owner, - WorkflowName: e.workflow.name, + WorkflowName: e.workflow.hexName, WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, ReferenceID: t.Ref, @@ -678,7 +675,6 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) { func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter, executionID string, status string) error { l := e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status) - metrics := e.metrics.with("status", status) l.Info("finishing execution") @@ -692,18 +688,28 @@ func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter return err } - executionDuration := execState.FinishedAt.Sub(*execState.CreatedAt).Milliseconds() - e.stepUpdatesChMap.remove(executionID) - metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len()) - metrics.updateWorkflowExecutionLatencyGauge(ctx, executionDuration) + + executionDuration := int64(execState.FinishedAt.Sub(*execState.CreatedAt).Seconds()) + switch status { + case store.StatusCompleted: + e.metrics.updateWorkflowCompletedDurationHistogram(ctx, executionDuration) + case store.StatusCompletedEarlyExit: + e.metrics.updateWorkflowEarlyExitDurationHistogram(ctx, executionDuration) + case store.StatusErrored: + e.metrics.updateWorkflowErrorDurationHistogram(ctx, executionDuration) + case store.StatusTimeout: + // should expect the same values unless the timeout is adjusted. + // using histogram as it gives count of executions for free + e.metrics.updateWorkflowTimeoutDurationHistogram(ctx, executionDuration) + } if executionDuration > fifteenMinutesMs { - logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d", executionDuration), l) - l.Warnf("execution duration exceeded 15 minutes: %d", executionDuration) + logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d (seconds)", executionDuration), l) + l.Warnf("execution duration exceeded 15 minutes: %d (seconds)", executionDuration) } - logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d", executionDuration), l) - l.Infof("execution duration: %d", executionDuration) + logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d (seconds)", executionDuration), l) + l.Infof("execution duration: %d (seconds)", executionDuration) e.onExecutionFinished(executionID) return nil } @@ -747,6 +753,7 @@ func (e *Engine) worker(ctx context.Context) { if err != nil { e.logger.With(platform.KeyWorkflowExecutionID, executionID).Errorf("failed to start execution: %v", err) logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger) + e.metrics.with(platform.KeyTriggerID, te.ID).incrementTriggerWorkflowStarterErrorCounter(ctx) } else { e.logger.With(platform.KeyWorkflowExecutionID, executionID).Debug("execution started") logCustMsg(ctx, cma, "execution started", e.logger) @@ -770,10 +777,21 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { Ref: msg.stepRef, } - // TODO ks-462 inputs logCustMsg(ctx, cma, "executing step", l) + stepExecutionStartTime := time.Now() inputs, outputs, err := e.executeStep(ctx, l, msg) + stepExecutionDuration := time.Since(stepExecutionStartTime).Seconds() + + curStepID := "UNSET" + curStep, verr := e.workflow.Vertex(msg.stepRef) + if verr == nil { + curStepID = curStep.ID + } else { + l.Errorf("failed to resolve step in workflow; error %v", verr) + } + e.metrics.with(platform.KeyCapabilityID, curStepID).updateWorkflowStepDurationHistogram(ctx, int64(stepExecutionDuration)) + var stepStatus string switch { case errors.Is(capabilities.ErrStopExecution, err): @@ -850,7 +868,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value // registry (for capability-level configuration). It doesn't perform any caching of the config values, since // the two registries perform their own caching. func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) { - secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.name) + secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.hexName) if err != nil { return nil, fmt.Errorf("failed to fetch secrets: %w", err) } @@ -894,16 +912,16 @@ func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *st // executeStep executes the referenced capability within a step and returns the result. func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRequest) (*values.Map, values.Value, error) { - step, err := e.workflow.Vertex(msg.stepRef) + curStep, err := e.workflow.Vertex(msg.stepRef) if err != nil { return nil, nil, err } var inputs any - if step.Inputs.OutputRef != "" { - inputs = step.Inputs.OutputRef + if curStep.Inputs.OutputRef != "" { + inputs = curStep.Inputs.OutputRef } else { - inputs = step.Inputs.Mapping + inputs = curStep.Inputs.Mapping } i, err := exec.FindAndInterpolateAllKeys(inputs, msg.state) @@ -916,7 +934,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe return nil, nil, err } - config, err := e.configForStep(ctx, lggr, step) + config, err := e.configForStep(ctx, lggr, curStep) if err != nil { return nil, nil, err } @@ -942,7 +960,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe WorkflowID: msg.state.WorkflowID, WorkflowExecutionID: msg.state.ExecutionID, WorkflowOwner: e.workflow.owner, - WorkflowName: e.workflow.name, + WorkflowName: e.workflow.hexName, WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, ReferenceID: msg.stepRef, @@ -952,9 +970,10 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe stepCtx, cancel := context.WithTimeout(ctx, stepTimeoutDuration) defer cancel() - e.metrics.incrementCapabilityInvocationCounter(stepCtx) - output, err := step.capability.Execute(stepCtx, tr) + e.metrics.with(platform.KeyCapabilityID, curStep.ID).incrementCapabilityInvocationCounter(ctx) + output, err := curStep.capability.Execute(stepCtx, tr) if err != nil { + e.metrics.with(platform.KeyStepRef, msg.stepRef, platform.KeyCapabilityID, curStep.ID).incrementCapabilityFailureCounter(ctx) return inputsMap, nil, err } @@ -967,7 +986,7 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, tr WorkflowID: e.workflow.id, WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, - WorkflowName: e.workflow.name, + WorkflowName: e.workflow.hexName, WorkflowOwner: e.workflow.owner, ReferenceID: t.Ref, }, @@ -1074,6 +1093,7 @@ func (e *Engine) isWorkflowFullyProcessed(ctx context.Context, state store.Workf return workflowProcessed, store.StatusCompleted, nil } +// heartbeat runs by default every defaultHeartbeatCadence minutes func (e *Engine) heartbeat(ctx context.Context) { defer e.wg.Done() @@ -1087,6 +1107,7 @@ func (e *Engine) heartbeat(ctx context.Context) { return case <-ticker.C: e.metrics.incrementEngineHeartbeatCounter(ctx) + e.metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len()) logCustMsg(ctx, e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger) } } @@ -1153,6 +1174,7 @@ func (e *Engine) Close() error { return err } logCustMsg(ctx, e.cma, "workflow unregistered", e.logger) + e.metrics.incrementWorkflowUnregisteredCounter(ctx) return nil }) } @@ -1249,6 +1271,12 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { // - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist) // - etc. + // spin up monitoring resources + em, err := initMonitoringResources() + if err != nil { + return nil, fmt.Errorf("could not initialize monitoring resources: %w", err) + } + cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName) workflow, err := Parse(cfg.Workflow) if err != nil { @@ -1258,12 +1286,12 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) { workflow.id = cfg.WorkflowID workflow.owner = cfg.WorkflowOwner - workflow.name = hex.EncodeToString([]byte(cfg.WorkflowName)) + workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName)) engine = &Engine{ cma: cma, logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), - metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, workflow.name)}, + metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName), *em}, registry: cfg.Registry, workflow: workflow, secretsFetcher: cfg.SecretsFetcher, diff --git a/core/services/workflows/models.go b/core/services/workflows/models.go index 0faf66d9883..e5d26a474f6 100644 --- a/core/services/workflows/models.go +++ b/core/services/workflows/models.go @@ -20,9 +20,9 @@ import ( // treated differently due to their nature of being the starting // point of a workflow. type workflow struct { - id string - owner string - name string + id string + owner string + hexName string graph.Graph[string, *step] triggers []*triggerCapability diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index d498ff354c9..8457dadeb60 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -5,90 +5,255 @@ import ( "fmt" "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/metrics" - localMonitoring "github.com/smartcontractkit/chainlink/v2/core/monitoring" + monutils "github.com/smartcontractkit/chainlink/v2/core/monitoring" ) -var registerTriggerFailureCounter metric.Int64Counter -var workflowsRunningGauge metric.Int64Gauge -var capabilityInvocationCounter metric.Int64Counter -var workflowExecutionLatencyGauge metric.Int64Gauge // ms -var workflowStepErrorCounter metric.Int64Counter -var engineHeartbeatCounter metric.Int64UpDownCounter +// em AKA "engine metrics" is to locally scope these instruments to avoid +// data races in testing +type engineMetrics struct { + registerTriggerFailureCounter metric.Int64Counter + triggerWorkflowStarterErrorCounter metric.Int64Counter + workflowsRunningGauge metric.Int64Gauge + capabilityInvocationCounter metric.Int64Counter + capabilityFailureCounter metric.Int64Counter + workflowRegisteredCounter metric.Int64Counter + workflowUnregisteredCounter metric.Int64Counter + workflowExecutionLatencyGauge metric.Int64Gauge // ms + workflowStepErrorCounter metric.Int64Counter + workflowInitializationCounter metric.Int64Counter + engineHeartbeatCounter metric.Int64Counter + workflowCompletedDurationSeconds metric.Int64Histogram + workflowEarlyExitDurationSeconds metric.Int64Histogram + workflowErrorDurationSeconds metric.Int64Histogram + workflowTimeoutDurationSeconds metric.Int64Histogram + workflowStepDurationSeconds metric.Int64Histogram +} + +func initMonitoringResources() (em *engineMetrics, err error) { + em = &engineMetrics{} + em.registerTriggerFailureCounter, err = beholder.GetMeter().Int64Counter("platform_engine_registertrigger_failures") + if err != nil { + return nil, fmt.Errorf("failed to register trigger failure counter: %w", err) + } + + em.triggerWorkflowStarterErrorCounter, err = beholder.GetMeter().Int64Counter("platform_engine_triggerworkflow_starter_errors") + if err != nil { + return nil, fmt.Errorf("failed to register trigger workflow starter error counter: %w", err) + } + + em.workflowsRunningGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_count") + if err != nil { + return nil, fmt.Errorf("failed to register workflows running gauge: %w", err) + } + + em.capabilityInvocationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_capabilities_count") + if err != nil { + return nil, fmt.Errorf("failed to register capability invocation counter: %w", err) + } + + em.capabilityFailureCounter, err = beholder.GetMeter().Int64Counter("platform_engine_capabilities_failures") + if err != nil { + return nil, fmt.Errorf("failed to register capability failure counter: %w", err) + } + + em.workflowRegisteredCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_registered_count") + if err != nil { + return nil, fmt.Errorf("failed to register workflow registered counter: %w", err) + } + + em.workflowUnregisteredCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_unregistered_count") + if err != nil { + return nil, fmt.Errorf("failed to register workflow unregistered counter: %w", err) + } + + em.workflowExecutionLatencyGauge, err = beholder.GetMeter().Int64Gauge( + "platform_engine_workflow_time", + metric.WithUnit("ms")) + if err != nil { + return nil, fmt.Errorf("failed to register workflow execution latency gauge: %w", err) + } + + em.workflowInitializationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_initializations") + if err != nil { + return nil, fmt.Errorf("failed to register workflow initialization counter: %w", err) + } + + em.workflowStepErrorCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_errors") + if err != nil { + return nil, fmt.Errorf("failed to register workflow step error counter: %w", err) + } -func initMonitoringResources() (err error) { - registerTriggerFailureCounter, err = beholder.GetMeter().Int64Counter("platform_engine_registertrigger_failures") + em.engineHeartbeatCounter, err = beholder.GetMeter().Int64Counter("platform_engine_heartbeat") if err != nil { - return fmt.Errorf("failed to register trigger failure counter: %w", err) + return nil, fmt.Errorf("failed to register engine heartbeat counter: %w", err) } - workflowsRunningGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_count") + em.workflowCompletedDurationSeconds, err = beholder.GetMeter().Int64Histogram( + "platform_engine_workflow_completed_time_seconds", + metric.WithDescription("Distribution of completed execution latencies"), + metric.WithUnit("seconds")) if err != nil { - return fmt.Errorf("failed to register workflows running gauge: %w", err) + return nil, fmt.Errorf("failed to register completed duration histogram: %w", err) } - capabilityInvocationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_capabilities_count") + em.workflowEarlyExitDurationSeconds, err = beholder.GetMeter().Int64Histogram( + "platform_engine_workflow_earlyexit_time_seconds", + metric.WithDescription("Distribution of earlyexit execution latencies"), + metric.WithUnit("seconds")) if err != nil { - return fmt.Errorf("failed to register capability invocation counter: %w", err) + return nil, fmt.Errorf("failed to register early exit duration histogram: %w", err) } - workflowExecutionLatencyGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_time") + em.workflowErrorDurationSeconds, err = beholder.GetMeter().Int64Histogram( + "platform_engine_workflow_error_time_seconds", + metric.WithDescription("Distribution of error execution latencies"), + metric.WithUnit("seconds")) if err != nil { - return fmt.Errorf("failed to register workflow execution latency gauge: %w", err) + return nil, fmt.Errorf("failed to register error duration histogram: %w", err) } - workflowStepErrorCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_errors") + em.workflowTimeoutDurationSeconds, err = beholder.GetMeter().Int64Histogram( + "platform_engine_workflow_timeout_time_seconds", + metric.WithDescription("Distribution of timeout execution latencies"), + metric.WithUnit("seconds")) if err != nil { - return fmt.Errorf("failed to register workflow step error counter: %w", err) + return nil, fmt.Errorf("failed to register timeout duration histogram: %w", err) } - engineHeartbeatCounter, err = beholder.GetMeter().Int64UpDownCounter("platform_engine_heartbeat") + em.workflowStepDurationSeconds, err = beholder.GetMeter().Int64Histogram( + "platform_engine_workflow_step_time_seconds", + metric.WithDescription("Distribution of step execution times"), + metric.WithUnit("seconds")) if err != nil { - return fmt.Errorf("failed to register engine heartbeat counter: %w", err) + return nil, fmt.Errorf("failed to register step execution time histogram: %w", err) } - return nil + return em, nil +} + +// Note: due to the OTEL specification, all histogram buckets +// Must be defined when the beholder client is created +func MetricViews() []sdkmetric.View { + return []sdkmetric.View{ + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_earlyexit_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 1, 10, 100}, + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_completed_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 100, 1000, 10_000, 50_000, 100_0000, 500_000}, + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_error_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 20, 60, 120, 240}, + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_step_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 20, 60, 120, 240}, + }}, + ), + } } // workflowsMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities // for monitoring resources type workflowsMetricLabeler struct { metrics.Labeler + em engineMetrics } func (c workflowsMetricLabeler) with(keyValues ...string) workflowsMetricLabeler { - return workflowsMetricLabeler{c.With(keyValues...)} + return workflowsMetricLabeler{c.With(keyValues...), c.em} } func (c workflowsMetricLabeler) incrementRegisterTriggerFailureCounter(ctx context.Context) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementTriggerWorkflowStarterErrorCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.triggerWorkflowStarterErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } func (c workflowsMetricLabeler) incrementCapabilityInvocationCounter(ctx context.Context) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - capabilityInvocationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.capabilityInvocationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } func (c workflowsMetricLabeler) updateWorkflowExecutionLatencyGauge(ctx context.Context, val int64) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - workflowExecutionLatencyGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowExecutionLatencyGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) } func (c workflowsMetricLabeler) incrementTotalWorkflowStepErrorsCounter(ctx context.Context) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - workflowStepErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowStepErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } func (c workflowsMetricLabeler) updateTotalWorkflowsGauge(ctx context.Context, val int64) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - workflowsRunningGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowsRunningGauge.Record(ctx, val, metric.WithAttributes(otelLabels...)) } func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) { - otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) - engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementCapabilityFailureCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.capabilityFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementWorkflowRegisteredCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowRegisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementWorkflowUnregisteredCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowUnregisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) incrementWorkflowInitializationCounter(ctx context.Context) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowInitializationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) updateWorkflowCompletedDurationHistogram(ctx context.Context, duration int64) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowCompletedDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) updateWorkflowEarlyExitDurationHistogram(ctx context.Context, duration int64) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowEarlyExitDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) updateWorkflowErrorDurationHistogram(ctx context.Context, duration int64) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowErrorDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) updateWorkflowTimeoutDurationHistogram(ctx context.Context, duration int64) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowTimeoutDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) +} + +func (c workflowsMetricLabeler) updateWorkflowStepDurationHistogram(ctx context.Context, duration int64) { + otelLabels := monutils.KvMapToOtelAttributes(c.Labels) + c.em.workflowStepDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...)) } diff --git a/core/services/workflows/monitoring_test.go b/core/services/workflows/monitoring_test.go index 5910e583c95..5b7177e51dc 100644 --- a/core/services/workflows/monitoring_test.go +++ b/core/services/workflows/monitoring_test.go @@ -9,11 +9,12 @@ import ( ) func Test_InitMonitoringResources(t *testing.T) { - require.NoError(t, initMonitoringResources()) + _, err := initMonitoringResources() + require.NoError(t, err) } func Test_WorkflowMetricsLabeler(t *testing.T) { - testWorkflowsMetricLabeler := workflowsMetricLabeler{metrics.NewLabeler()} + testWorkflowsMetricLabeler := workflowsMetricLabeler{metrics.NewLabeler(), engineMetrics{}} testWorkflowsMetricLabeler2 := testWorkflowsMetricLabeler.with("foo", "baz") require.EqualValues(t, testWorkflowsMetricLabeler2.Labels["foo"], "baz") } diff --git a/go.mod b/go.mod index e80ee7a7ba2..f532f3d9a6f 100644 --- a/go.mod +++ b/go.mod @@ -104,6 +104,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.49.0 go.opentelemetry.io/otel v1.31.0 go.opentelemetry.io/otel/metric v1.31.0 + go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 @@ -361,7 +362,6 @@ require ( go.opentelemetry.io/otel/log v0.6.0 // indirect go.opentelemetry.io/otel/sdk v1.31.0 // indirect go.opentelemetry.io/otel/sdk/log v0.6.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/ratelimit v0.3.0 // indirect golang.org/x/arch v0.11.0 // indirect