Skip to content

Commit

Permalink
Add armada_queue_labels metric (#3874)
Browse files Browse the repository at this point in the history
* Add armada_queue_labels metric

This metric will include all the labels the queue has a metric labels

It will be useful to allow filtering queries to just queues that have a certain attribute (label)

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Add test

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

---------

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
  • Loading branch information
JamesMurkin committed Aug 14, 2024
1 parent 99f8b26 commit 6923240
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 15 deletions.
3 changes: 2 additions & 1 deletion internal/common/metrics/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"time"

armadaresource "github.com/armadaproject/armada/internal/common/resource"
"github.com/armadaproject/armada/pkg/api"
)

type QueueMetricProvider interface {
GetQueuedJobMetrics(queueName string) []*QueueMetrics
GetRunningJobMetrics(queueName string) []*QueueMetrics
GetQueuePriorites() map[string]float64
GetAllQueues() []*api.Queue
}

type QueueMetrics struct {
Expand Down
45 changes: 43 additions & 2 deletions internal/common/metrics/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,22 @@ var QueuePriorityDesc = prometheus.NewDesc(
nil,
)

var (
queueLabelMetricName = MetricPrefix + "queue_labels"
queueLabelMetricDescription = "Queue labels"
queueLabelDefaultLabels = []string{"queueName", "queue"}
)

// QueueLabelDesc so it can be added to AllDescs which makes Describe() work properly
//
// actual describe for this metric is generated dynamically as the labels are dynamic
var QueueLabelDesc = prometheus.NewDesc(
queueLabelMetricName,
queueLabelMetricDescription,
queueLabelDefaultLabels,
nil,
)

var AllDescs = []*prometheus.Desc{
QueueSizeDesc,
QueuePriorityDesc,
Expand All @@ -202,6 +218,7 @@ var AllDescs = []*prometheus.Desc{
ClusterCapacityDesc,
ClusterAvailableCapacityDesc,
QueuePriorityDesc,
QueueLabelDesc,
}

func Describe(out chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -265,8 +282,9 @@ func CollectQueueMetrics(queueCounts map[string]int, queueDistinctSchedulingKeyC
}
}
}
for q, priority := range metricsProvider.GetQueuePriorites() {
metrics = append(metrics, NewQueuePriorityMetric(priority, q))
for _, queue := range metricsProvider.GetAllQueues() {
metrics = append(metrics, NewQueuePriorityMetric(queue.PriorityFactor, queue.Name))
metrics = append(metrics, NewQueueLabelsMetric(queue.Name, queue.Labels))
}
return metrics
}
Expand Down Expand Up @@ -366,3 +384,26 @@ func NewQueueUsed(value float64, queue string, cluster string, pool string, reso
func NewQueuePriorityMetric(value float64, queue string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueuePriorityDesc, prometheus.GaugeValue, value, queue, queue)
}

func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Metric {
metricLabels := make([]string, 0, len(labels)+len(queueLabelDefaultLabels))
values := make([]string, 0, len(labels)+len(queueLabelDefaultLabels))

metricLabels = append(metricLabels, queueLabelDefaultLabels...)
values = append(values, queue)
values = append(values, queue)

for key, value := range labels {
metricLabels = append(metricLabels, key)
values = append(values, value)
}

queueLabelsDesc := prometheus.NewDesc(
queueLabelMetricName,
queueLabelMetricDescription,
metricLabels,
nil,
)

return prometheus.MustNewConstMetric(queueLabelsDesc, prometheus.GaugeValue, 1, values...)
}
13 changes: 8 additions & 5 deletions internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,38 @@ import (

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/maps"
"k8s.io/utils/clock"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/logging"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
commonmetrics "github.com/armadaproject/armada/internal/common/metrics"
"github.com/armadaproject/armada/internal/common/resource"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/database"
"github.com/armadaproject/armada/internal/scheduler/floatingresources"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/queue"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
)

// Metrics Recorders associated with a queue
type queueState struct {
queuedJobRecorder *commonmetrics.JobMetricsRecorder
runningJobRecorder *commonmetrics.JobMetricsRecorder
priority float64
queue *api.Queue
}

// metricProvider is a simple implementation of QueueMetricProvider
type metricProvider struct {
queueStates map[string]*queueState
}

func (m metricProvider) GetQueuePriorites() map[string]float64 {
return armadamaps.MapValues(m.queueStates, func(v *queueState) float64 {
return v.priority
func (m metricProvider) GetAllQueues() []*api.Queue {
return slices.Map(maps.Values(m.queueStates), func(state *queueState) *api.Queue {
return state.queue
})
}

Expand Down Expand Up @@ -154,7 +157,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro
provider.queueStates[queue.Name] = &queueState{
queuedJobRecorder: commonmetrics.NewJobMetricsRecorder(),
runningJobRecorder: commonmetrics.NewJobMetricsRecorder(),
priority: queue.PriorityFactor,
queue: queue,
}
queuedJobsCount[queue.Name] = 0
schedulingKeysByQueue[queue.Name] = map[schedulerobjects.SchedulingKey]bool{}
Expand Down
15 changes: 8 additions & 7 deletions internal/scheduler/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package scheduler

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -44,6 +43,9 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
jobCreationTime := testfixtures.BaseTime.Add(-time.Duration(500) * time.Second).UnixNano()
jobWithTerminatedRun := testfixtures.TestQueuedJobDbJob().WithCreated(jobCreationTime).WithUpdatedRun(run)

queue := testfixtures.MakeTestQueue()
queue.Labels = map[string]string{"foo": "bar"}

tests := map[string]struct {
initialJobs []*jobdb.Job
defaultPool string
Expand All @@ -53,7 +55,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
}{
"queued metrics": {
initialJobs: queuedJobs,
queues: []*api.Queue{testfixtures.MakeTestQueue()},
queues: []*api.Queue{queue},
defaultPool: testfixtures.TestPool,
expected: []prometheus.Metric{
commonmetrics.NewQueueSizeMetric(3.0, testfixtures.TestQueue),
Expand All @@ -74,13 +76,15 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
commonmetrics.NewMaxQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"),
commonmetrics.NewMedianQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"),
commonmetrics.NewCountQueueResources(3, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"),
commonmetrics.NewQueuePriorityMetric(100, testfixtures.TestQueue),
commonmetrics.NewQueueLabelsMetric(testfixtures.TestQueue, map[string]string{"foo": "bar"}),
},
},
"queued metrics for requeued job": {
// This job was been requeued and has a terminated run
// The queue duration stats should count from the time the last run finished instead of job creation time
initialJobs: []*jobdb.Job{jobWithTerminatedRun},
queues: []*api.Queue{testfixtures.MakeTestQueue()},
queues: []*api.Queue{queue},
defaultPool: testfixtures.TestPool,
expected: []prometheus.Metric{
commonmetrics.NewQueueSizeMetric(1.0, testfixtures.TestQueue),
Expand All @@ -105,7 +109,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
},
"running metrics": {
initialJobs: runningJobs,
queues: []*api.Queue{testfixtures.MakeTestQueue()},
queues: []*api.Queue{queue},
defaultPool: testfixtures.TestPool,
expected: []prometheus.Metric{
commonmetrics.NewQueueSizeMetric(0.0, testfixtures.TestQueue),
Expand Down Expand Up @@ -168,9 +172,6 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
for i := 0; i < len(tc.expected); i++ {
a1 := actual[i]
e1 := tc.expected[i]
if !assert.Equal(t, e1, a1) {
fmt.Println("here")
}
require.Equal(t, e1, a1)
}
})
Expand Down

0 comments on commit 6923240

Please sign in to comment.