From 79f9a92b9efab15531181e3454719baab562f498 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Wed, 31 Jan 2024 12:39:02 +0000 Subject: [PATCH] Cache error regex matching for metrics (#3351) * Cache error regex matching * Fix nil reference * Rename scheduled to leased * Rename scheduled to leased * Cleanup --- config/scheduler/config.yaml | 1 + .../scheduler/configuration/configuration.go | 3 + internal/scheduler/metrics/metrics.go | 71 ++++++++++++++++--- internal/scheduler/scheduler.go | 2 +- 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 068bf583d01..0c500cfa52d 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -24,6 +24,7 @@ schedulerMetrics: - "memory" - "ephemeral-storage" - "nvidia.com/gpu" + matchedRegexIndexByErrorMessageCacheSize: 100 resetInterval: "1h" pulsar: URL: "pulsar://pulsar:6650" diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index 4f0f10e84c3..c57bb399673 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -76,6 +76,9 @@ type MetricsConfig struct { // Controls the cycle time metrics. // TODO(albin): Not used yet. CycleTimeConfig PrometheusSummaryConfig + // The first matching regex of each error message is cached in an LRU cache. + // This setting controls the cache size. + MatchedRegexIndexByErrorMessageCacheSize uint64 // Reset metrics this often. Resetting periodically ensures inactive time series are garbage-collected. ResetInterval time.Duration } diff --git a/internal/scheduler/metrics/metrics.go b/internal/scheduler/metrics/metrics.go index dcb55538e20..2ebf1752a1f 100644 --- a/internal/scheduler/metrics/metrics.go +++ b/internal/scheduler/metrics/metrics.go @@ -5,6 +5,7 @@ import ( "time" "github.com/google/uuid" + lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -29,7 +30,7 @@ const ( queued = "queued" cancelled = "cancelled" - scheduled = "scheduled" + leased = "leased" preempted = "preempted" failed = "failed" succeeded = "succeeded" @@ -51,6 +52,10 @@ type Metrics struct { // Pre-compiled regexes for error categorisation. errorRegexes []*regexp.Regexp + // Map from error message to the index of the first matching regex. + // Messages that match no regex map to -1. + matchedRegexIndexByErrorMessage *lru.Cache + // Job metrics. transitions *prometheus.CounterVec } @@ -64,6 +69,16 @@ func New(config configuration.MetricsConfig) (*Metrics, error) { errorRegexes[i] = r } } + + var matchedRegexIndexByError *lru.Cache + if config.MatchedRegexIndexByErrorMessageCacheSize > 0 { + var err error + matchedRegexIndexByError, err = lru.New(int(config.MatchedRegexIndexByErrorMessageCacheSize)) + if err != nil { + return nil, errors.WithStack(err) + } + } + return &Metrics{ config: config, @@ -72,6 +87,8 @@ func New(config configuration.MetricsConfig) (*Metrics, error) { buffer: make([]string, 0, 8), + matchedRegexIndexByErrorMessage: matchedRegexIndexByError, + transitions: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -219,12 +236,12 @@ func (m *Metrics) UpdateSucceeded(job *jobdb.Job) error { return nil } -func (m *Metrics) UpdateScheduled(jctx *schedulercontext.JobSchedulingContext) error { +func (m *Metrics) UpdateLeased(jctx *schedulercontext.JobSchedulingContext) error { labels := m.buffer[0:0] job := jctx.Job.(*jobdb.Job) - labels = append(labels, scheduled) - labels = append(labels, "") // No category for scheduled. - labels = append(labels, "") // No subCategory for scheduled. + labels = append(labels, leased) + labels = append(labels, "") // No category for leased. + labels = append(labels, "") // No subCategory for leased. labels = appendLabelsFromJobSchedulingContext(labels, jctx) if err := m.updateCounterVecFromJob(m.transitions, labels, job); err != nil { return err @@ -250,14 +267,52 @@ func (m *Metrics) failedCategoryAndSubCategoryFromJob(ctx *armadacontext.Context if run == nil { return } + category, message := errorTypeAndMessageFromError(ctx, jobRunErrorsByRunId[run.Id()]) + i, ok := m.regexIndexFromErrorMessage(message) + if ok { + subCategory = m.config.TrackedErrorRegexes[i] + } + + return +} + +func (m *Metrics) regexIndexFromErrorMessage(message string) (int, bool) { + i, ok := m.cachedRegexIndexFromErrorMessage(message) + if !ok { + i, ok = m.indexOfFirstMatchingRegexFromErrorMessage(message) + if !ok { + // Use -1 to indicate that no regex matches. + i = -1 + } + if m.matchedRegexIndexByErrorMessage != nil { + m.matchedRegexIndexByErrorMessage.Add(message, i) + } + } + if i == -1 { + ok = false + } + return i, ok +} + +func (m *Metrics) cachedRegexIndexFromErrorMessage(message string) (int, bool) { + if m.matchedRegexIndexByErrorMessage == nil { + return 0, false + } + i, ok := m.matchedRegexIndexByErrorMessage.Get(message) + if !ok { + return 0, false + } + return i.(int), true +} + +func (m *Metrics) indexOfFirstMatchingRegexFromErrorMessage(message string) (int, bool) { for i, r := range m.errorRegexes { if r.MatchString(message) { - subCategory = m.config.TrackedErrorRegexes[i] - return + return i, true } } - return + return 0, false } func appendLabelsFromJob(labels []string, job *jobdb.Job) []string { diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index f0db5c08c6e..0c1ace6a98d 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -344,7 +344,7 @@ func (s *Scheduler) updateMetricsFromSchedulerResult(ctx *armadacontext.Context, return nil } for _, jctx := range overallSchedulerResult.ScheduledJobs { - if err := s.schedulerMetrics.UpdateScheduled(jctx); err != nil { + if err := s.schedulerMetrics.UpdateLeased(jctx); err != nil { return err } }