Skip to content

Commit

Permalink
Make queued time metric reset when a job gets requeued (#3875)
Browse files Browse the repository at this point in the history
* Make queued time metric reset when a job gets requeued

Currently the queue duration metric always uses the jobs created time to determine how long it has been in the queue

When jobs gets requeued, the time in state should start from when it was requeued rather than when the job was created
 - As it has only been in the queue since its last run terminated

This was producing large "jumps" in queued time as jobs were tried for 15 minutes, requeued and immediately reporting 15 minutes in the queue - despite only being requeued a few seconds ago

It is reasonable we may want to know when these retries are occurring / time they are costing, but I think they should be separate metrics, i.e:
 - Retry count
 - Total time queued? Total time attempting to start? Something along this line

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Include actual logic change

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

---------

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
  • Loading branch information
JamesMurkin authored Aug 14, 2024
1 parent a2d3a7f commit 99f8b26
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
9 changes: 8 additions & 1 deletion internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,14 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro
continue
}
recorder = qs.queuedJobRecorder
timeInState = currentTime.Sub(time.Unix(0, job.Created()))
queuedTime := time.Unix(0, job.Created())
if job.HasRuns() {
terminationTimeOfLatestRun := job.LatestRun().TerminatedTime()
if terminationTimeOfLatestRun != nil && terminationTimeOfLatestRun.After(queuedTime) {
queuedTime = *terminationTimeOfLatestRun
}
}
timeInState = currentTime.Sub(queuedTime)
queuedJobsCount[job.Queue()]++
schedulingKeysByQueue[job.Queue()][job.SchedulingKey()] = true
} else {
Expand Down
43 changes: 43 additions & 0 deletions internal/scheduler/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package scheduler

import (
"fmt"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -31,6 +33,17 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
runningJobs[i] = testfixtures.TestRunningJobDbJob(startTime)
}

// Run that has been returned
runStartTime := testfixtures.BaseTime.Add(-time.Duration(400) * time.Second).UnixNano()
runTerminatedTime := testfixtures.BaseTime.Add(-time.Duration(200) * time.Second)
run := jobdb.MinimalRun(uuid.New(), runStartTime)
run = run.WithFailed(true)
run = run.WithReturned(true)
run = run.WithTerminatedTime(&runTerminatedTime)

jobCreationTime := testfixtures.BaseTime.Add(-time.Duration(500) * time.Second).UnixNano()
jobWithTerminatedRun := testfixtures.TestQueuedJobDbJob().WithCreated(jobCreationTime).WithUpdatedRun(run)

tests := map[string]struct {
initialJobs []*jobdb.Job
defaultPool string
Expand Down Expand Up @@ -63,6 +76,33 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
commonmetrics.NewCountQueueResources(3, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"),
},
},
"queued metrics for requeued job": {
// This job was been requeued and has a terminated run
// The queue duration stats should count from the time the last run finished instead of job creation time
initialJobs: []*jobdb.Job{jobWithTerminatedRun},
queues: []*api.Queue{testfixtures.MakeTestQueue()},
defaultPool: testfixtures.TestPool,
expected: []prometheus.Metric{
commonmetrics.NewQueueSizeMetric(1.0, testfixtures.TestQueue),
commonmetrics.NewQueueDistinctSchedulingKeyMetric(1.0, testfixtures.TestQueue),
commonmetrics.NewQueueDuration(1, 200,
map[float64]uint64{60: 0, 600: 1, 1800: 1, 3600: 1, 10800: 1, 43200: 1, 86400: 1, 172800: 1, 604800: 1},
testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMinQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMaxQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewMedianQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue),
commonmetrics.NewQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewMinQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewMaxQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewMedianQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewCountQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"),
commonmetrics.NewQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"),
commonmetrics.NewMinQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"),
commonmetrics.NewMaxQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"),
commonmetrics.NewMedianQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"),
commonmetrics.NewCountQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"),
},
},
"running metrics": {
initialJobs: runningJobs,
queues: []*api.Queue{testfixtures.MakeTestQueue()},
Expand Down Expand Up @@ -128,6 +168,9 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
for i := 0; i < len(tc.expected); i++ {
a1 := actual[i]
e1 := tc.expected[i]
if !assert.Equal(t, e1, a1) {
fmt.Println("here")
}
require.Equal(t, e1, a1)
}
})
Expand Down

0 comments on commit 99f8b26

Please sign in to comment.