diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index b4bc129a8ce..9cf10b02fdf 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -164,7 +164,7 @@ func (sctx *SchedulingContext) GetQueue(queue string) (fairness.Queue, bool) { func (sctx *SchedulingContext) TotalCost() float64 { var rv float64 for _, qctx := range sctx.QueueSchedulingContexts { - rv += sctx.FairnessCostProvider.CostFromQueue(qctx) + rv += sctx.FairnessCostProvider.UnweightedCostFromQueue(qctx) } return rv } @@ -187,7 +187,7 @@ func (sctx *SchedulingContext) UpdateFairShares() { for queueName, qctx := range sctx.QueueSchedulingContexts { cappedShare := 1.0 if !sctx.TotalResources.IsZero() { - cappedShare = sctx.FairnessCostProvider.CostFromAllocationAndWeight(qctx.Demand, qctx.Weight) * qctx.Weight + cappedShare = sctx.FairnessCostProvider.UnweightedCostFromAllocation(qctx.Demand) } queueInfos = append(queueInfos, &queueInfo{ queueName: queueName, diff --git a/internal/scheduler/fairness/fairness.go b/internal/scheduler/fairness/fairness.go index 991a14fbe09..07b861bdb94 100644 --- a/internal/scheduler/fairness/fairness.go +++ b/internal/scheduler/fairness/fairness.go @@ -21,8 +21,10 @@ type Queue interface { // FairnessCostProvider captures algorithms to compute the cost of an allocation. type FairnessCostProvider interface { - CostFromQueue(queue Queue) float64 - CostFromAllocationAndWeight(allocation schedulerobjects.ResourceList, weight float64) float64 + UnweightedCostFromQueue(queue Queue) float64 + UnweightedCostFromAllocation(allocation schedulerobjects.ResourceList) float64 + WeightedCostFromQueue(queue Queue) float64 + WeightedCostFromAllocation(allocation schedulerobjects.ResourceList, weight float64) float64 } type DominantResourceFairness struct { @@ -42,11 +44,19 @@ func NewDominantResourceFairness(totalResources schedulerobjects.ResourceList, r }, nil } -func (f *DominantResourceFairness) CostFromQueue(queue Queue) float64 { - return f.CostFromAllocationAndWeight(queue.GetAllocation(), queue.GetWeight()) +func (f *DominantResourceFairness) WeightedCostFromQueue(queue Queue) float64 { + return f.UnweightedCostFromQueue(queue) / queue.GetWeight() } -func (f *DominantResourceFairness) CostFromAllocationAndWeight(allocation schedulerobjects.ResourceList, weight float64) float64 { +func (f *DominantResourceFairness) UnweightedCostFromQueue(queue Queue) float64 { + return f.UnweightedCostFromAllocation(queue.GetAllocation()) +} + +func (f *DominantResourceFairness) WeightedCostFromAllocation(allocation schedulerobjects.ResourceList, weight float64) float64 { + return f.UnweightedCostFromAllocation(allocation) / weight +} + +func (f *DominantResourceFairness) UnweightedCostFromAllocation(allocation schedulerobjects.ResourceList) float64 { var cost float64 for _, t := range f.resourcesToConsider { capacity := f.totalResources.Get(t) @@ -60,5 +70,5 @@ func (f *DominantResourceFairness) CostFromAllocationAndWeight(allocation schedu cost = tcost } } - return cost / weight + return cost } diff --git a/internal/scheduler/fairness/fairness_test.go b/internal/scheduler/fairness/fairness_test.go index 1f16b8db42c..00a96788f06 100644 --- a/internal/scheduler/fairness/fairness_test.go +++ b/internal/scheduler/fairness/fairness_test.go @@ -156,12 +156,12 @@ func TestDominantResourceFairness(t *testing.T) { assert.Equal( t, tc.expectedCost, - f.CostFromAllocationAndWeight(tc.allocation, tc.weight), + f.WeightedCostFromAllocation(tc.allocation, tc.weight), ) assert.Equal( t, - f.CostFromAllocationAndWeight(tc.allocation, tc.weight), - f.CostFromQueue(MinimalQueue{allocation: tc.allocation, weight: tc.weight}), + f.WeightedCostFromAllocation(tc.allocation, tc.weight), + f.WeightedCostFromQueue(MinimalQueue{allocation: tc.allocation, weight: tc.weight}), ) }) } diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index dc6c8c069df..8732836cbc0 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -131,7 +131,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche return false } if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.Queue()]; ok { - actualShare := sch.schedulingContext.FairnessCostProvider.CostFromQueue(qctx) / totalCost + actualShare := sch.schedulingContext.FairnessCostProvider.UnweightedCostFromQueue(qctx) / totalCost fairShare := qctx.FairShare if sch.useAdjustedFairShareProtection { fairShare = math.Max(qctx.AdjustedFairShare, fairShare) diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index e726c4b4a88..5e31e10553a 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1312,6 +1312,37 @@ func TestPreemptingQueueScheduler(t *testing.T) { "D": 1, }, }, + "ProtectedFractionOfFairShare non equal weights": { + SchedulingConfig: testfixtures.WithProtectedFractionOfFairShareConfig( + 1.0, + testfixtures.TestSchedulingConfig(), + ), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Rounds: []SchedulingRound{ + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass2NonPreemptible, 24), + "B": testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 8), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 23), + "B": testfixtures.IntRange(0, 7), + }, + }, + { + // D submits one more job. No preemption occurs because B is below adjusted fair share + JobsByQueue: map[string][]*jobdb.Job{ + "C": testfixtures.N1Cpu4GiJobs("C", testfixtures.PriorityClass0, 1), + }, + }, + {}, // Empty round to make sure nothing changes. + }, + PriorityFactorByQueue: map[string]float64{ + "A": 1, + "B": 2, + "C": 1, + }, + }, "DominantResourceFairness": { SchedulingConfig: testfixtures.TestSchedulingConfig(), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index a3d5b33d8a9..de7cb68dd5c 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -465,7 +465,7 @@ func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSc it.buffer.Zero() it.buffer.Add(queue.GetAllocation()) it.buffer.Add(gctx.TotalResourceRequests) - return it.fairnessCostProvider.CostFromAllocationAndWeight(it.buffer, queue.GetWeight()), nil + return it.fairnessCostProvider.WeightedCostFromAllocation(it.buffer, queue.GetWeight()), nil } // Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job. diff --git a/internal/scheduler/scheduler_metrics.go b/internal/scheduler/scheduler_metrics.go index 04464bb7ac4..6a528f77c09 100644 --- a/internal/scheduler/scheduler_metrics.go +++ b/internal/scheduler/scheduler_metrics.go @@ -199,7 +199,7 @@ func (metrics *SchedulerMetrics) calculateQueuePoolMetrics(schedulingContexts [] for queue, queueContext := range schedContext.QueueSchedulingContexts { key := queuePoolKey{queue: queue, pool: pool} - actualShare := schedContext.FairnessCostProvider.CostFromQueue(queueContext) / totalCost + actualShare := schedContext.FairnessCostProvider.UnweightedCostFromQueue(queueContext) / totalCost result[key] = queuePoolData{ numberOfJobsConsidered: len(queueContext.UnsuccessfulJobSchedulingContexts) + len(queueContext.SuccessfulJobSchedulingContexts), fairShare: queueContext.FairShare,