Skip to content

Commit

Permalink
Cache error regex matching for metrics (#3351)
Browse files Browse the repository at this point in the history
* Cache error regex matching

* Fix nil reference

* Rename scheduled to leased

* Rename scheduled to leased

* Cleanup
  • Loading branch information
severinson authored Jan 31, 2024
1 parent dde8a57 commit 79f9a92
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 9 deletions.
1 change: 1 addition & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ schedulerMetrics:
- "memory"
- "ephemeral-storage"
- "nvidia.com/gpu"
matchedRegexIndexByErrorMessageCacheSize: 100
resetInterval: "1h"
pulsar:
URL: "pulsar://pulsar:6650"
Expand Down
3 changes: 3 additions & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type MetricsConfig struct {
// Controls the cycle time metrics.
// TODO(albin): Not used yet.
CycleTimeConfig PrometheusSummaryConfig
// The first matching regex of each error message is cached in an LRU cache.
// This setting controls the cache size.
MatchedRegexIndexByErrorMessageCacheSize uint64
// Reset metrics this often. Resetting periodically ensures inactive time series are garbage-collected.
ResetInterval time.Duration
}
Expand Down
71 changes: 63 additions & 8 deletions internal/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

Expand All @@ -29,7 +30,7 @@ const (

queued = "queued"
cancelled = "cancelled"
scheduled = "scheduled"
leased = "leased"
preempted = "preempted"
failed = "failed"
succeeded = "succeeded"
Expand All @@ -51,6 +52,10 @@ type Metrics struct {
// Pre-compiled regexes for error categorisation.
errorRegexes []*regexp.Regexp

// Map from error message to the index of the first matching regex.
// Messages that match no regex map to -1.
matchedRegexIndexByErrorMessage *lru.Cache

// Job metrics.
transitions *prometheus.CounterVec
}
Expand All @@ -64,6 +69,16 @@ func New(config configuration.MetricsConfig) (*Metrics, error) {
errorRegexes[i] = r
}
}

var matchedRegexIndexByError *lru.Cache
if config.MatchedRegexIndexByErrorMessageCacheSize > 0 {
var err error
matchedRegexIndexByError, err = lru.New(int(config.MatchedRegexIndexByErrorMessageCacheSize))
if err != nil {
return nil, errors.WithStack(err)
}
}

return &Metrics{
config: config,

Expand All @@ -72,6 +87,8 @@ func New(config configuration.MetricsConfig) (*Metrics, error) {

buffer: make([]string, 0, 8),

matchedRegexIndexByErrorMessage: matchedRegexIndexByError,

transitions: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -219,12 +236,12 @@ func (m *Metrics) UpdateSucceeded(job *jobdb.Job) error {
return nil
}

func (m *Metrics) UpdateScheduled(jctx *schedulercontext.JobSchedulingContext) error {
func (m *Metrics) UpdateLeased(jctx *schedulercontext.JobSchedulingContext) error {
labels := m.buffer[0:0]
job := jctx.Job.(*jobdb.Job)
labels = append(labels, scheduled)
labels = append(labels, "") // No category for scheduled.
labels = append(labels, "") // No subCategory for scheduled.
labels = append(labels, leased)
labels = append(labels, "") // No category for leased.
labels = append(labels, "") // No subCategory for leased.
labels = appendLabelsFromJobSchedulingContext(labels, jctx)
if err := m.updateCounterVecFromJob(m.transitions, labels, job); err != nil {
return err
Expand All @@ -250,14 +267,52 @@ func (m *Metrics) failedCategoryAndSubCategoryFromJob(ctx *armadacontext.Context
if run == nil {
return
}

category, message := errorTypeAndMessageFromError(ctx, jobRunErrorsByRunId[run.Id()])
i, ok := m.regexIndexFromErrorMessage(message)
if ok {
subCategory = m.config.TrackedErrorRegexes[i]
}

return
}

func (m *Metrics) regexIndexFromErrorMessage(message string) (int, bool) {
i, ok := m.cachedRegexIndexFromErrorMessage(message)
if !ok {
i, ok = m.indexOfFirstMatchingRegexFromErrorMessage(message)
if !ok {
// Use -1 to indicate that no regex matches.
i = -1
}
if m.matchedRegexIndexByErrorMessage != nil {
m.matchedRegexIndexByErrorMessage.Add(message, i)
}
}
if i == -1 {
ok = false
}
return i, ok
}

func (m *Metrics) cachedRegexIndexFromErrorMessage(message string) (int, bool) {
if m.matchedRegexIndexByErrorMessage == nil {
return 0, false
}
i, ok := m.matchedRegexIndexByErrorMessage.Get(message)
if !ok {
return 0, false
}
return i.(int), true
}

func (m *Metrics) indexOfFirstMatchingRegexFromErrorMessage(message string) (int, bool) {
for i, r := range m.errorRegexes {
if r.MatchString(message) {
subCategory = m.config.TrackedErrorRegexes[i]
return
return i, true
}
}
return
return 0, false
}

func appendLabelsFromJob(labels []string, job *jobdb.Job) []string {
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (s *Scheduler) updateMetricsFromSchedulerResult(ctx *armadacontext.Context,
return nil
}
for _, jctx := range overallSchedulerResult.ScheduledJobs {
if err := s.schedulerMetrics.UpdateScheduled(jctx); err != nil {
if err := s.schedulerMetrics.UpdateLeased(jctx); err != nil {
return err
}
}
Expand Down

0 comments on commit 79f9a92

Please sign in to comment.