From 99f8b26dc3d0846770028e7494827e01d45831e2 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 14 Aug 2024 12:32:22 +0100 Subject: [PATCH] Make queued time metric reset when a job gets requeued (#3875) * Make queued time metric reset when a job gets requeued Currently the queue duration metric always uses the jobs created time to determine how long it has been in the queue When jobs gets requeued, the time in state should start from when it was requeued rather than when the job was created - As it has only been in the queue since its last run terminated This was producing large "jumps" in queued time as jobs were tried for 15 minutes, requeued and immediately reporting 15 minutes in the queue - despite only being requeued a few seconds ago It is reasonable we may want to know when these retries are occurring / time they are costing, but I think they should be separate metrics, i.e: - Retry count - Total time queued? Total time attempting to start? Something along this line Signed-off-by: JamesMurkin * Include actual logic change Signed-off-by: JamesMurkin --------- Signed-off-by: JamesMurkin --- internal/scheduler/metrics.go | 9 ++++++- internal/scheduler/metrics_test.go | 43 ++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 23e6765abb1..159caeee0c9 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -200,7 +200,14 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro continue } recorder = qs.queuedJobRecorder - timeInState = currentTime.Sub(time.Unix(0, job.Created())) + queuedTime := time.Unix(0, job.Created()) + if job.HasRuns() { + terminationTimeOfLatestRun := job.LatestRun().TerminatedTime() + if terminationTimeOfLatestRun != nil && terminationTimeOfLatestRun.After(queuedTime) { + queuedTime = *terminationTimeOfLatestRun + } + } + timeInState = currentTime.Sub(queuedTime) queuedJobsCount[job.Queue()]++ schedulingKeysByQueue[job.Queue()][job.SchedulingKey()] = true } else { diff --git a/internal/scheduler/metrics_test.go b/internal/scheduler/metrics_test.go index 11a8ee455a3..d0f2bdbd753 100644 --- a/internal/scheduler/metrics_test.go +++ b/internal/scheduler/metrics_test.go @@ -1,10 +1,12 @@ package scheduler import ( + "fmt" "testing" "time" "github.com/golang/mock/gomock" + "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -31,6 +33,17 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { runningJobs[i] = testfixtures.TestRunningJobDbJob(startTime) } + // Run that has been returned + runStartTime := testfixtures.BaseTime.Add(-time.Duration(400) * time.Second).UnixNano() + runTerminatedTime := testfixtures.BaseTime.Add(-time.Duration(200) * time.Second) + run := jobdb.MinimalRun(uuid.New(), runStartTime) + run = run.WithFailed(true) + run = run.WithReturned(true) + run = run.WithTerminatedTime(&runTerminatedTime) + + jobCreationTime := testfixtures.BaseTime.Add(-time.Duration(500) * time.Second).UnixNano() + jobWithTerminatedRun := testfixtures.TestQueuedJobDbJob().WithCreated(jobCreationTime).WithUpdatedRun(run) + tests := map[string]struct { initialJobs []*jobdb.Job defaultPool string @@ -63,6 +76,33 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { commonmetrics.NewCountQueueResources(3, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"), }, }, + "queued metrics for requeued job": { + // This job was been requeued and has a terminated run + // The queue duration stats should count from the time the last run finished instead of job creation time + initialJobs: []*jobdb.Job{jobWithTerminatedRun}, + queues: []*api.Queue{testfixtures.MakeTestQueue()}, + defaultPool: testfixtures.TestPool, + expected: []prometheus.Metric{ + commonmetrics.NewQueueSizeMetric(1.0, testfixtures.TestQueue), + commonmetrics.NewQueueDistinctSchedulingKeyMetric(1.0, testfixtures.TestQueue), + commonmetrics.NewQueueDuration(1, 200, + map[float64]uint64{60: 0, 600: 1, 1800: 1, 3600: 1, 10800: 1, 43200: 1, 86400: 1, 172800: 1, 604800: 1}, + testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMinQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMaxQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewMedianQueueDuration(200, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue), + commonmetrics.NewQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), + commonmetrics.NewMinQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), + commonmetrics.NewMaxQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), + commonmetrics.NewMedianQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), + commonmetrics.NewCountQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "cpu"), + commonmetrics.NewQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"), + commonmetrics.NewMinQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"), + commonmetrics.NewMaxQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"), + commonmetrics.NewMedianQueueResources(gb, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"), + commonmetrics.NewCountQueueResources(1, testfixtures.TestPool, testfixtures.TestDefaultPriorityClass, testfixtures.TestQueue, "memory"), + }, + }, "running metrics": { initialJobs: runningJobs, queues: []*api.Queue{testfixtures.MakeTestQueue()}, @@ -128,6 +168,9 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { for i := 0; i < len(tc.expected); i++ { a1 := actual[i] e1 := tc.expected[i] + if !assert.Equal(t, e1, a1) { + fmt.Println("here") + } require.Equal(t, e1, a1) } })