From 69232402e78d1a420d56374415436fe15d2ebf0e Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 14 Aug 2024 13:20:02 +0100 Subject: [PATCH] Add armada_queue_labels metric (#3874) * Add armada_queue_labels metric This metric will include all the labels the queue has a metric labels It will be useful to allow filtering queries to just queues that have a certain attribute (label) Signed-off-by: JamesMurkin * Add test Signed-off-by: JamesMurkin --------- Signed-off-by: JamesMurkin --- internal/common/metrics/domain.go | 3 +- internal/common/metrics/scheduler_metrics.go | 45 +++++++++++++++++++- internal/scheduler/metrics.go | 13 +++--- internal/scheduler/metrics_test.go | 15 ++++--- 4 files changed, 61 insertions(+), 15 deletions(-) diff --git a/internal/common/metrics/domain.go b/internal/common/metrics/domain.go index 599f1fdc8b6..86b7bd80c83 100644 --- a/internal/common/metrics/domain.go +++ b/internal/common/metrics/domain.go @@ -5,12 +5,13 @@ import ( "time" armadaresource "github.com/armadaproject/armada/internal/common/resource" + "github.com/armadaproject/armada/pkg/api" ) type QueueMetricProvider interface { GetQueuedJobMetrics(queueName string) []*QueueMetrics GetRunningJobMetrics(queueName string) []*QueueMetrics - GetQueuePriorites() map[string]float64 + GetAllQueues() []*api.Queue } type QueueMetrics struct { diff --git a/internal/common/metrics/scheduler_metrics.go b/internal/common/metrics/scheduler_metrics.go index a99af7e7509..b6ce130f500 100644 --- a/internal/common/metrics/scheduler_metrics.go +++ b/internal/common/metrics/scheduler_metrics.go @@ -176,6 +176,22 @@ var QueuePriorityDesc = prometheus.NewDesc( nil, ) +var ( + queueLabelMetricName = MetricPrefix + "queue_labels" + queueLabelMetricDescription = "Queue labels" + queueLabelDefaultLabels = []string{"queueName", "queue"} +) + +// QueueLabelDesc so it can be added to AllDescs which makes Describe() work properly +// +// actual describe for this metric is generated dynamically as the labels are dynamic +var QueueLabelDesc = prometheus.NewDesc( + queueLabelMetricName, + queueLabelMetricDescription, + queueLabelDefaultLabels, + nil, +) + var AllDescs = []*prometheus.Desc{ QueueSizeDesc, QueuePriorityDesc, @@ -202,6 +218,7 @@ var AllDescs = []*prometheus.Desc{ ClusterCapacityDesc, ClusterAvailableCapacityDesc, QueuePriorityDesc, + QueueLabelDesc, } func Describe(out chan<- *prometheus.Desc) { @@ -265,8 +282,9 @@ func CollectQueueMetrics(queueCounts map[string]int, queueDistinctSchedulingKeyC } } } - for q, priority := range metricsProvider.GetQueuePriorites() { - metrics = append(metrics, NewQueuePriorityMetric(priority, q)) + for _, queue := range metricsProvider.GetAllQueues() { + metrics = append(metrics, NewQueuePriorityMetric(queue.PriorityFactor, queue.Name)) + metrics = append(metrics, NewQueueLabelsMetric(queue.Name, queue.Labels)) } return metrics } @@ -366,3 +384,26 @@ func NewQueueUsed(value float64, queue string, cluster string, pool string, reso func NewQueuePriorityMetric(value float64, queue string) prometheus.Metric { return prometheus.MustNewConstMetric(QueuePriorityDesc, prometheus.GaugeValue, value, queue, queue) } + +func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Metric { + metricLabels := make([]string, 0, len(labels)+len(queueLabelDefaultLabels)) + values := make([]string, 0, len(labels)+len(queueLabelDefaultLabels)) + + metricLabels = append(metricLabels, queueLabelDefaultLabels...) + values = append(values, queue) + values = append(values, queue) + + for key, value := range labels { + metricLabels = append(metricLabels, key) + values = append(values, value) + } + + queueLabelsDesc := prometheus.NewDesc( + queueLabelMetricName, + queueLabelMetricDescription, + metricLabels, + nil, + ) + + return prometheus.MustNewConstMetric(queueLabelsDesc, prometheus.GaugeValue, 1, values...) +} diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 159caeee0c9..67a8c5cd838 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/exp/maps" "k8s.io/utils/clock" "github.com/armadaproject/armada/internal/common/armadacontext" @@ -14,18 +15,20 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" commonmetrics "github.com/armadaproject/armada/internal/common/metrics" "github.com/armadaproject/armada/internal/common/resource" + "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/queue" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/pkg/api" ) // Metrics Recorders associated with a queue type queueState struct { queuedJobRecorder *commonmetrics.JobMetricsRecorder runningJobRecorder *commonmetrics.JobMetricsRecorder - priority float64 + queue *api.Queue } // metricProvider is a simple implementation of QueueMetricProvider @@ -33,9 +36,9 @@ type metricProvider struct { queueStates map[string]*queueState } -func (m metricProvider) GetQueuePriorites() map[string]float64 { - return armadamaps.MapValues(m.queueStates, func(v *queueState) float64 { - return v.priority +func (m metricProvider) GetAllQueues() []*api.Queue { + return slices.Map(maps.Values(m.queueStates), func(state *queueState) *api.Queue { + return state.queue }) } @@ -154,7 +157,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro provider.queueStates[queue.Name] = &queueState{ queuedJobRecorder: commonmetrics.NewJobMetricsRecorder(), runningJobRecorder: commonmetrics.NewJobMetricsRecorder(), - priority: queue.PriorityFactor, + queue: queue, } queuedJobsCount[queue.Name] = 0 schedulingKeysByQueue[queue.Name] = map[schedulerobjects.SchedulingKey]bool{} diff --git a/internal/scheduler/metrics_test.go b/internal/scheduler/metrics_test.go index d0f2bdbd753..11ae37ea65b 100644 --- a/internal/scheduler/metrics_test.go +++ b/internal/scheduler/metrics_test.go @@ -1,7 +1,6 @@ package scheduler import ( - "fmt" "testing" "time" @@ -44,6 +43,9 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { jobCreationTime := testfixtures.BaseTime.Add(-time.Duration(500) * time.Second).UnixNano() jobWithTerminatedRun := testfixtures.TestQueuedJobDbJob().WithCreated(jobCreationTime).WithUpdatedRun(run) + queue := testfixtures.MakeTestQueue() + queue.Labels = map[string]string{"foo": "bar"} + tests := map[string]struct { initialJobs []*jobdb.Job defaultPool string @@ -53,7 +55,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { }{ "queued metrics": { initialJobs: queuedJobs, - queues: []*api.Queue{testfixtures.MakeTestQueue()}, + queues: []*api.Queue{queue}, defaultPool: testfixtures.TestPool, expected: []prometheus.Metric{ commonmetrics.NewQueueSizeMetric(3.0, testfixtures.TestQueue), @@ -74,13 +76,15 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { commonmetrics.NewMaxQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"), commonmetrics.NewMedianQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"), commonmetrics.NewCountQueueResources(3, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"), + commonmetrics.NewQueuePriorityMetric(100, testfixtures.TestQueue), + commonmetrics.NewQueueLabelsMetric(testfixtures.TestQueue, map[string]string{"foo": "bar"}), }, }, "queued metrics for requeued job": { // This job was been requeued and has a terminated run // The queue duration stats should count from the time the last run finished instead of job creation time initialJobs: []*jobdb.Job{jobWithTerminatedRun}, - queues: []*api.Queue{testfixtures.MakeTestQueue()}, + queues: []*api.Queue{queue}, defaultPool: testfixtures.TestPool, expected: []prometheus.Metric{ commonmetrics.NewQueueSizeMetric(1.0, testfixtures.TestQueue), @@ -105,7 +109,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { }, "running metrics": { initialJobs: runningJobs, - queues: []*api.Queue{testfixtures.MakeTestQueue()}, + queues: []*api.Queue{queue}, defaultPool: testfixtures.TestPool, expected: []prometheus.Metric{ commonmetrics.NewQueueSizeMetric(0.0, testfixtures.TestQueue), @@ -168,9 +172,6 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { for i := 0; i < len(tc.expected); i++ { a1 := actual[i] e1 := tc.expected[i] - if !assert.Equal(t, e1, a1) { - fmt.Println("here") - } require.Equal(t, e1, a1) } })