diff --git a/internal/scheduler/interfaces/interfaces.go b/internal/scheduler/interfaces/interfaces.go index 7786fb995a0..98ca4ee0392 100644 --- a/internal/scheduler/interfaces/interfaces.go +++ b/internal/scheduler/interfaces/interfaces.go @@ -20,16 +20,3 @@ type LegacySchedulerJob interface { GetTolerations() []v1.Toleration GetResourceRequirements() v1.ResourceRequirements } - -func PodRequirementFromLegacySchedulerJob(job LegacySchedulerJob, priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements { - schedulingInfo := job.GetRequirements(priorityClasses) - if schedulingInfo == nil { - return nil - } - for _, objectReq := range schedulingInfo.ObjectRequirements { - if req := objectReq.GetPodRequirements(); req != nil { - return req - } - } - return nil -} diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index 4ddd839a9d6..7f4c9c89a82 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -20,6 +20,7 @@ import ( "github.com/armadaproject/armada/internal/common/util" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" + "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -556,59 +557,52 @@ func BindPodToNode(req *schedulerobjects.PodRequirements, node *schedulerobjects return node, nil } -// EvictPodFromNode returns a copy of node with req evicted from it. Specifically: -// - The job is marked as evicted on the node. -// - AllocatedByJobId and AllocatedByQueue are not updated. -// - Resources requested by the evicted pod are marked as allocated at priority evictedPriority. -func EvictPodFromNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { - jobId, err := JobIdFromPodRequirements(req) - if err != nil { - return nil, err - } - queue, err := QueueFromPodRequirements(req) - if err != nil { - return nil, err - } - node = node.DeepCopy() - - // Ensure we track allocated resources at evictedPriority. - if _, ok := node.AllocatableByPriorityAndResource[evictedPriority]; !ok { - pMin := int32(math.MaxInt32) - ok := false - for p := range node.AllocatableByPriorityAndResource { - if p < pMin { - pMin = p - ok = true - } - } - if ok { - node.AllocatableByPriorityAndResource[evictedPriority] = node.AllocatableByPriorityAndResource[pMin].DeepCopy() - } - } - +// EvictJobFromNodeInPlace marks the given job as evicted and moves its resources from the job's +// priority to preemptedPriority; it does not update AllocatedByJobId and AllocatedByQueue. +func EvictJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) error { + jobId := job.GetId() if _, ok := node.AllocatedByJobId[jobId]; !ok { - return nil, errors.Errorf("job %s has no resources allocated on node %s", jobId, node.Id) + return errors.Errorf("job %s has no resources allocated on node %s", jobId, node.Id) } + + queue := job.GetQueue() if _, ok := node.AllocatedByQueue[queue]; !ok { - return nil, errors.Errorf("queue %s has no resources allocated on node %s", queue, node.Id) + return errors.Errorf("queue %s has no resources allocated on node %s", queue, node.Id) } + if node.EvictedJobRunIds == nil { node.EvictedJobRunIds = make(map[string]bool) } if _, ok := node.EvictedJobRunIds[jobId]; ok { - // TODO: We're using run ids instead of job ids for now. - return nil, errors.Errorf("job %s is already evicted from node %s", jobId, node.Id) - } else { - node.EvictedJobRunIds[jobId] = true + return errors.Errorf("job %s is already evicted from node %s", jobId, node.Id) } + node.EvictedJobRunIds[jobId] = true - schedulerobjects.AllocatableByPriorityAndResourceType( - node.AllocatableByPriorityAndResource, - ).MarkAllocatableV1ResourceList(req.Priority, req.ResourceRequirements.Requests) - schedulerobjects.AllocatableByPriorityAndResourceType( - node.AllocatableByPriorityAndResource, - ).MarkAllocatedV1ResourceList(evictedPriority, req.ResourceRequirements.Requests) - return node, nil + if _, ok := node.AllocatableByPriorityAndResource[evictedPriority]; !ok { + minimumPriority := int32(math.MaxInt32) + foundPriority := false + for p := range node.AllocatableByPriorityAndResource { + if p < minimumPriority { + minimumPriority = p + foundPriority = true + } + } + if foundPriority { + node.AllocatableByPriorityAndResource[evictedPriority] = node.AllocatableByPriorityAndResource[minimumPriority].DeepCopy() + } else { + // We do not expect to hit this branch; however, if we do, then we need + // to make sure that evictedPriority is in this map so that the call to + // MarkAllocatedV1ResourceList() below knows about it. + node.AllocatableByPriorityAndResource[evictedPriority] = schedulerobjects.ResourceList{} + } + } + allocatable := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource) + priority := priorityClasses[job.GetPriorityClassName()].Priority + requests := job.GetResourceRequirements().Requests + allocatable.MarkAllocatableV1ResourceList(priority, requests) + allocatable.MarkAllocatedV1ResourceList(evictedPriority, requests) + + return nil } // UnbindPodsFromNode returns a node with all reqs unbound from it. diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index c62e0abfe6a..29e9b7620eb 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -103,40 +103,45 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) { } func TestNodeBindingEvictionUnbinding(t *testing.T) { + // We add evictedPriority here to simplify the resource accounting checks + // below; it is always present after the first eviction. node := testfixtures.Test8GpuNode(append(testfixtures.TestPriorities, evictedPriority)) - req := testfixtures.N1GpuPodReqs("A", 0, 1)[0] - request := schedulerobjects.ResourceListFromV1ResourceList(req.ResourceRequirements.Requests) - jobId, err := JobIdFromPodRequirements(req) + job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0) + jobId := job.GetId() + priority := testfixtures.TestPriorityClasses[job.GetPriorityClassName()].Priority + schedulingInfo := job.GetRequirements(testfixtures.TestPriorityClasses) + podRequirements := schedulingInfo.ObjectRequirements[0].GetPodRequirements() + requests := schedulerobjects.ResourceListFromV1ResourceList(job.GetResourceRequirements().Requests) + + boundNode, err := BindPodToNode(podRequirements, node) require.NoError(t, err) - boundNode, err := BindPodToNode(req, node) + unboundNode, err := UnbindPodFromNode(podRequirements, boundNode) require.NoError(t, err) - unboundNode, err := UnbindPodFromNode(req, boundNode) + unboundMultipleNode, err := UnbindPodsFromNode([]*schedulerobjects.PodRequirements{podRequirements}, boundNode) require.NoError(t, err) - unboundMultipleNode, err := UnbindPodsFromNode([]*schedulerobjects.PodRequirements{req}, boundNode) + evictedNode := boundNode.DeepCopy() + err = EvictJobFromNodeInPlace(testfixtures.TestPriorityClasses, job, evictedNode) require.NoError(t, err) - evictedNode, err := EvictPodFromNode(req, boundNode) + evictedUnboundNode, err := UnbindPodFromNode(podRequirements, evictedNode) require.NoError(t, err) - evictedUnboundNode, err := UnbindPodFromNode(req, evictedNode) + evictedBoundNode, err := BindPodToNode(podRequirements, evictedNode) require.NoError(t, err) - evictedBoundNode, err := BindPodToNode(req, evictedNode) - require.NoError(t, err) - - _, err = EvictPodFromNode(req, node) + err = EvictJobFromNodeInPlace(testfixtures.TestPriorityClasses, job, node) require.Error(t, err) - _, err = UnbindPodFromNode(req, node) + _, err = UnbindPodFromNode(podRequirements, node) require.Error(t, err) - _, err = BindPodToNode(req, boundNode) + _, err = BindPodToNode(podRequirements, boundNode) require.Error(t, err) - _, err = EvictPodFromNode(req, evictedNode) + err = EvictJobFromNodeInPlace(testfixtures.TestPriorityClasses, job, evictedNode) require.Error(t, err) assertNodeAccountingEqual(t, node, unboundNode) @@ -148,14 +153,14 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { assert.True( t, armadamaps.DeepEqual( - map[string]schedulerobjects.ResourceList{jobId: request}, + map[string]schedulerobjects.ResourceList{jobId: requests}, boundNode.AllocatedByJobId, ), ) assert.True( t, armadamaps.DeepEqual( - map[string]schedulerobjects.ResourceList{jobId: request}, + map[string]schedulerobjects.ResourceList{jobId: requests}, evictedNode.AllocatedByJobId, ), ) @@ -163,21 +168,21 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { assert.True( t, armadamaps.DeepEqual( - map[string]schedulerobjects.ResourceList{"A": request}, + map[string]schedulerobjects.ResourceList{"A": requests}, boundNode.AllocatedByQueue, ), ) assert.True( t, armadamaps.DeepEqual( - map[string]schedulerobjects.ResourceList{"A": request}, + map[string]schedulerobjects.ResourceList{"A": requests}, evictedNode.AllocatedByQueue, ), ) expectedAllocatable := boundNode.TotalResources.DeepCopy() - expectedAllocatable.Sub(request) - assert.True(t, expectedAllocatable.Equal(boundNode.AllocatableByPriorityAndResource[req.Priority])) + expectedAllocatable.Sub(requests) + assert.True(t, expectedAllocatable.Equal(boundNode.AllocatableByPriorityAndResource[priority])) assert.Empty(t, unboundNode.AllocatedByJobId) assert.Empty(t, unboundNode.AllocatedByQueue) @@ -637,17 +642,3 @@ func randomString(n int) string { } return s } - -func GetTestNodeDb() *NodeDb { - nodeDb, err := NewNodeDb( - testfixtures.TestPriorityClasses, - 0, - testfixtures.TestResources, - testfixtures.TestIndexedTaints, - testfixtures.TestIndexedNodeLabels, - ) - if err != nil { - panic(err) - } - return nodeDb -} diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 78931edd37c..f6b929e63ff 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -808,41 +808,41 @@ func (evi *Evictor) Evict(ctx context.Context, it nodedb.NodeIterator) (*Evictor evictedJobsById := make(map[string]interfaces.LegacySchedulerJob) affectedNodesById := make(map[string]*schedulerobjects.Node) nodeIdByJobId := make(map[string]string) - for node := it.NextNode(); node != nil; node = it.NextNode() { - if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) { + for originalNode := it.NextNode(); originalNode != nil; originalNode = it.NextNode() { + if evi.nodeFilter != nil && !evi.nodeFilter(ctx, originalNode) { continue } - jobIds := util.Filter( - maps.Keys(node.AllocatedByJobId), - func(jobId string) bool { - _, ok := node.EvictedJobRunIds[jobId] - return !ok - }, - ) + jobIds := make([]string, 0) + for jobId := range originalNode.AllocatedByJobId { + if _, ok := originalNode.EvictedJobRunIds[jobId]; !ok { + jobIds = append(jobIds, jobId) + } + } jobs, err := evi.jobRepo.GetExistingJobsByIds(jobIds) if err != nil { return nil, err } + var affectedNode *schedulerobjects.Node for _, job := range jobs { if evi.jobFilter != nil && !evi.jobFilter(ctx, job) { continue } - req := PodRequirementFromLegacySchedulerJob(job, evi.priorityClasses) - if req == nil { - continue + if affectedNode == nil { + affectedNode = originalNode.DeepCopy() } - node, err = nodedb.EvictPodFromNode(req, node) + err = nodedb.EvictJobFromNodeInPlace(evi.priorityClasses, job, affectedNode) if err != nil { return nil, err } if evi.postEvictFunc != nil { - evi.postEvictFunc(ctx, job, node) + evi.postEvictFunc(ctx, job, affectedNode) } - evictedJobsById[job.GetId()] = job - nodeIdByJobId[job.GetId()] = node.Id + nodeIdByJobId[job.GetId()] = affectedNode.Id + } + if affectedNode != nil { + affectedNodesById[affectedNode.Id] = affectedNode } - affectedNodesById[node.Id] = node } return &EvictorResult{ EvictedJobsById: evictedJobsById,