diff --git a/internal/scheduler/metrics/cycle_metrics.go b/internal/scheduler/metrics/cycle_metrics.go index 296c491826b..2970359f3e2 100644 --- a/internal/scheduler/metrics/cycle_metrics.go +++ b/internal/scheduler/metrics/cycle_metrics.go @@ -14,6 +14,8 @@ var ( ) type cycleMetrics struct { + leaderMetricsEnabled bool + scheduledJobs *prometheus.CounterVec premptedJobs *prometheus.CounterVec consideredJobs *prometheus.GaugeVec @@ -25,97 +27,137 @@ type cycleMetrics struct { cappedDemand *prometheus.GaugeVec scheduleCycleTime prometheus.Histogram reconciliationCycleTime prometheus.Histogram + allResettableMetrics []resettableMetric } func newCycleMetrics() *cycleMetrics { + scheduledJobs := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: prefix + "scheduled_jobs", + Help: "Number of events scheduled", + }, + queueAndPriorityClassLabels, + ) + + premptedJobs := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: prefix + "preempted_jobs", + Help: "Number of jobs preempted", + }, + queueAndPriorityClassLabels, + ) + + consideredJobs := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prefix + "considered_jobs", + Help: "Number of jobs considered", + }, + poolAndQueueLabels, + ) + + fairShare := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prefix + "fair_share", + Help: "Fair share of each queue", + }, + poolAndQueueLabels, + ) + + adjustedFairShare := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prefix + "adjusted_fair_share", + Help: "Adjusted Fair share of each queue", + }, + poolAndQueueLabels, + ) + + actualShare := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prefix + "actual_share", + Help: "Actual Fair share of each queue", + }, + poolAndQueueLabels, + ) + + demand := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prefix + "demand", + Help: "Demand of each queue", + }, + poolAndQueueLabels, + ) + + cappedDemand := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prefix + "capped_demand", + Help: "Capped Demand of each queue and pool. This differs from demand in that it limits demand by scheduling constraints", + }, + poolAndQueueLabels, + ) + + fairnessError := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: prefix + "fairness_error", + Help: "Cumulative delta between adjusted fair share and actual share for all users who are below their fair share", + }, + []string{poolLabel}, + ) + + scheduleCycleTime := prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: prefix + "schedule_cycle_times", + Help: "Cycle time when in a scheduling round.", + Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110), + }, + ) + + reconciliationCycleTime := prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: prefix + "reconciliation_cycle_times", + Help: "Cycle time when in a scheduling round.", + Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110), + }, + ) + return &cycleMetrics{ - scheduledJobs: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: prefix + "scheduled_jobs", - Help: "Number of events scheduled", - }, - queueAndPriorityClassLabels, - ), - - premptedJobs: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: prefix + "preempted_jobs", - Help: "Number of jobs preempted", - }, - queueAndPriorityClassLabels, - ), - - consideredJobs: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: prefix + "considered_jobs", - Help: "Number of jobs considered", - }, - poolAndQueueLabels, - ), - - fairShare: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: prefix + "fair_share", - Help: "Fair share of each queue", - }, - poolAndQueueLabels, - ), - - adjustedFairShare: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: prefix + "adjusted_fair_share", - Help: "Adjusted Fair share of each queue", - }, - poolAndQueueLabels, - ), - - actualShare: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: prefix + "actual_share", - Help: "Actual Fair share of each queue", - }, - poolAndQueueLabels, - ), - - demand: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: prefix + "demand", - Help: "Demand of each queue", - }, - poolAndQueueLabels, - ), - - cappedDemand: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: prefix + "capped_demand", - Help: "Capped Demand of each queue and pool. This differs from demand in that it limits demand by scheduling constraints", - }, - poolAndQueueLabels, - ), - - fairnessError: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: prefix + "fairness_error", - Help: "Cumulative delta between adjusted fair share and actual share for all users who are below their fair share", - }, - []string{poolLabel}, - ), - - scheduleCycleTime: prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: prefix + "schedule_cycle_times", - Help: "Cycle time when in a scheduling round.", - Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110), - }, - ), - - reconciliationCycleTime: prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: prefix + "reconciliation_cycle_times", - Help: "Cycle time when in a scheduling round.", - Buckets: prometheus.ExponentialBuckets(10.0, 1.1, 110), - }, - ), + leaderMetricsEnabled: true, + scheduledJobs: scheduledJobs, + premptedJobs: premptedJobs, + consideredJobs: consideredJobs, + fairShare: fairShare, + adjustedFairShare: adjustedFairShare, + actualShare: actualShare, + demand: demand, + cappedDemand: cappedDemand, + fairnessError: fairnessError, + scheduleCycleTime: scheduleCycleTime, + allResettableMetrics: []resettableMetric{ + scheduledJobs, + premptedJobs, + consideredJobs, + fairShare, + adjustedFairShare, + actualShare, + demand, + cappedDemand, + fairnessError, + }, + reconciliationCycleTime: reconciliationCycleTime, + } +} + +func (m *cycleMetrics) enableLeaderMetrics() { + m.leaderMetricsEnabled = true +} + +func (m *cycleMetrics) disableLeaderMetrics() { + m.resetLeaderMetrics() + m.leaderMetricsEnabled = false +} + +func (m *cycleMetrics) resetLeaderMetrics() { + for _, metric := range m.allResettableMetrics { + metric.Reset() } } @@ -157,29 +199,35 @@ func (m *cycleMetrics) ReportSchedulerResult(result schedulerresult.SchedulerRes } func (m *cycleMetrics) describe(ch chan<- *prometheus.Desc) { - m.scheduledJobs.Describe(ch) - m.premptedJobs.Describe(ch) - m.consideredJobs.Describe(ch) - m.fairShare.Describe(ch) - m.adjustedFairShare.Describe(ch) - m.actualShare.Describe(ch) - m.fairnessError.Describe(ch) - m.demand.Describe(ch) - m.cappedDemand.Describe(ch) - m.scheduleCycleTime.Describe(ch) + if m.leaderMetricsEnabled { + m.scheduledJobs.Describe(ch) + m.premptedJobs.Describe(ch) + m.consideredJobs.Describe(ch) + m.fairShare.Describe(ch) + m.adjustedFairShare.Describe(ch) + m.actualShare.Describe(ch) + m.fairnessError.Describe(ch) + m.demand.Describe(ch) + m.cappedDemand.Describe(ch) + m.scheduleCycleTime.Describe(ch) + } + m.reconciliationCycleTime.Describe(ch) } func (m *cycleMetrics) collect(ch chan<- prometheus.Metric) { - m.scheduledJobs.Collect(ch) - m.premptedJobs.Collect(ch) - m.consideredJobs.Collect(ch) - m.fairShare.Collect(ch) - m.adjustedFairShare.Collect(ch) - m.actualShare.Collect(ch) - m.fairnessError.Collect(ch) - m.demand.Collect(ch) - m.cappedDemand.Collect(ch) - m.scheduleCycleTime.Collect(ch) + if m.leaderMetricsEnabled { + m.scheduledJobs.Collect(ch) + m.premptedJobs.Collect(ch) + m.consideredJobs.Collect(ch) + m.fairShare.Collect(ch) + m.adjustedFairShare.Collect(ch) + m.actualShare.Collect(ch) + m.fairnessError.Collect(ch) + m.demand.Collect(ch) + m.cappedDemand.Collect(ch) + m.scheduleCycleTime.Collect(ch) + } + m.reconciliationCycleTime.Collect(ch) } diff --git a/internal/scheduler/metrics/cycle_metrics_test.go b/internal/scheduler/metrics/cycle_metrics_test.go index 2f86dcb9c91..c0bc686a8d6 100644 --- a/internal/scheduler/metrics/cycle_metrics_test.go +++ b/internal/scheduler/metrics/cycle_metrics_test.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -78,6 +79,80 @@ func TestReportStateTransitions(t *testing.T) { assert.InDelta(t, 0.05, fairnessError, epsilon, "fairnessError") } +func TestResetLeaderMetrics(t *testing.T) { + m := newCycleMetrics() + + poolQueueLabelValues := []string{"pool1", "queue1"} + queuePriorityClassLabelValues := []string{"pool1", "priorityClass1"} + + testResetCounter := func(vec *prometheus.CounterVec, labelValues []string) { + vec.WithLabelValues(labelValues...).Inc() + counterVal := testutil.ToFloat64(vec.WithLabelValues(labelValues...)) + assert.Equal(t, 1.0, counterVal) + m.resetLeaderMetrics() + counterVal = testutil.ToFloat64(vec.WithLabelValues(labelValues...)) + assert.Equal(t, 0.0, counterVal) + } + testResetGauge := func(vec *prometheus.GaugeVec, labelValues []string) { + vec.WithLabelValues(labelValues...).Inc() + counterVal := testutil.ToFloat64(vec.WithLabelValues(labelValues...)) + assert.Equal(t, 1.0, counterVal) + m.resetLeaderMetrics() + counterVal = testutil.ToFloat64(vec.WithLabelValues(labelValues...)) + assert.Equal(t, 0.0, counterVal) + } + + testResetCounter(m.scheduledJobs, queuePriorityClassLabelValues) + testResetCounter(m.premptedJobs, queuePriorityClassLabelValues) + testResetGauge(m.consideredJobs, poolQueueLabelValues) + testResetGauge(m.fairShare, poolQueueLabelValues) + testResetGauge(m.adjustedFairShare, poolQueueLabelValues) + testResetGauge(m.actualShare, poolQueueLabelValues) + testResetGauge(m.fairnessError, []string{"pool1"}) + testResetGauge(m.demand, poolQueueLabelValues) + testResetGauge(m.cappedDemand, poolQueueLabelValues) +} + +func TestDisableLeaderMetrics(t *testing.T) { + m := newCycleMetrics() + + poolQueueLabelValues := []string{"pool1", "queue1"} + queuePriorityClassLabelValues := []string{"pool1", "priorityClass1"} + + collect := func(m *cycleMetrics) []prometheus.Metric { + m.scheduledJobs.WithLabelValues(queuePriorityClassLabelValues...).Inc() + m.premptedJobs.WithLabelValues(queuePriorityClassLabelValues...).Inc() + m.consideredJobs.WithLabelValues(poolQueueLabelValues...).Inc() + m.fairShare.WithLabelValues(poolQueueLabelValues...).Inc() + m.adjustedFairShare.WithLabelValues(poolQueueLabelValues...).Inc() + m.actualShare.WithLabelValues(poolQueueLabelValues...).Inc() + m.fairnessError.WithLabelValues("pool1").Inc() + m.demand.WithLabelValues(poolQueueLabelValues...).Inc() + m.cappedDemand.WithLabelValues(poolQueueLabelValues...).Inc() + m.scheduleCycleTime.Observe(float64(1000)) + m.reconciliationCycleTime.Observe(float64(1000)) + + ch := make(chan prometheus.Metric, 1000) + m.collect(ch) + collected := make([]prometheus.Metric, 0, len(ch)) + for len(ch) > 0 { + collected = append(collected, <-ch) + } + return collected + } + + // Enabled + assert.NotZero(t, len(collect(m))) + + // Disabled + m.disableLeaderMetrics() + assert.Equal(t, 1, len(collect(m))) + + // Enabled + m.enableLeaderMetrics() + assert.NotZero(t, len(collect(m))) +} + func cpu(n int) schedulerobjects.ResourceList { return schedulerobjects.ResourceList{ Resources: map[string]resource.Quantity{"cpu": resource.MustParse(fmt.Sprintf("%d", n))}, diff --git a/internal/scheduler/metrics/metrics.go b/internal/scheduler/metrics/metrics.go index 655c3449d9d..9614e365fd9 100644 --- a/internal/scheduler/metrics/metrics.go +++ b/internal/scheduler/metrics/metrics.go @@ -13,6 +13,12 @@ import ( type Metrics struct { *cycleMetrics *jobStateMetrics + leaderMetricsEnabled bool +} + +type resettableMetric interface { + prometheus.Collector + Reset() } func New(errorRegexes []string, trackedResourceNames []v1.ResourceName, jobStateMetricsResetInterval time.Duration) (*Metrics, error) { @@ -30,20 +36,24 @@ func New(errorRegexes []string, trackedResourceNames []v1.ResourceName, jobState }, nil } -// DisableJobStateMetrics stops the jobStateMetrics from being produced. This is necessary because we only produce -// these metrics when we are leader in order to avoid double counting -func (m *Metrics) DisableJobStateMetrics() { +// DisableLeaderMetrics stops leader metrics from being produced. This is necessary because we only produce +// some metrics when we are leader in order to avoid double counting +func (m *Metrics) DisableLeaderMetrics() { m.jobStateMetrics.disable() + m.cycleMetrics.disableLeaderMetrics() + m.leaderMetricsEnabled = false } -// EnableJobStateMetrics starts the jobStateMetrics -func (m *Metrics) EnableJobStateMetrics() { +// EnableLeaderMetrics starts the job state and cycle metrics produced when scheduler is the leader +func (m *Metrics) EnableLeaderMetrics() { m.jobStateMetrics.enable() + m.cycleMetrics.enableLeaderMetrics() + m.leaderMetricsEnabled = true } -// JobStateMetricsEnabled returns true if job state metrics are enabled -func (m *Metrics) JobStateMetricsEnabled() bool { - return m.jobStateMetrics.isEnabled() +// LeaderMetricsEnabled returns true if leader metrics are enabled +func (m *Metrics) LeaderMetricsEnabled() bool { + return m.leaderMetricsEnabled } // Describe is necessary to implement the prometheus.Collector interface diff --git a/internal/scheduler/metrics/state_metrics.go b/internal/scheduler/metrics/state_metrics.go index d4e3fa6e1c2..8d4b9ae0817 100644 --- a/internal/scheduler/metrics/state_metrics.go +++ b/internal/scheduler/metrics/state_metrics.go @@ -11,11 +11,6 @@ import ( "github.com/armadaproject/armada/pkg/armadaevents" ) -type resettableMetric interface { - prometheus.Collector - Reset() -} - type jobStateMetrics struct { errorRegexes []*regexp.Regexp resetInterval time.Duration @@ -269,11 +264,6 @@ func (m *jobStateMetrics) enable() { m.enabled = true } -// isEnabled returns true if job state metrics are enabled -func (m *jobStateMetrics) isEnabled() bool { - return m.enabled -} - // stateDuration returns: // - the duration of the current state (stateTime - priorTime) // - the prior state name diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 1a8301864e9..a552594c29e 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -240,10 +240,10 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke // Only export metrics if leader. if !s.leaderController.ValidateToken(leaderToken) { ctx.Info("Not the leader so will not attempt to schedule") - s.metrics.DisableJobStateMetrics() + s.metrics.DisableLeaderMetrics() return overallSchedulerResult, err } else { - s.metrics.EnableJobStateMetrics() + s.metrics.EnableLeaderMetrics() } // If we've been asked to generate messages for all jobs, do so. @@ -275,7 +275,7 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke ctx.Infof("Fetched %d job run errors", len(jobRepoRunErrorsByRunId)) // Update metrics. - if s.metrics.JobStateMetricsEnabled() { + if s.metrics.LeaderMetricsEnabled() { s.metrics.ReportStateTransitions(jsts, jobRepoRunErrorsByRunId) } @@ -344,7 +344,7 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke txn.Commit() ctx.Info("Completed committing cycle transaction") - if s.metrics.JobStateMetricsEnabled() { + if s.metrics.LeaderMetricsEnabled() { for _, jctx := range overallSchedulerResult.ScheduledJobs { s.metrics.ReportJobLeased(jctx.Job) }