Skip to content

Commit

Permalink
Reduce allocations in Evict
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq committed Jun 20, 2023
1 parent af8c741 commit d309461
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 108 deletions.
13 changes: 0 additions & 13 deletions internal/scheduler/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,3 @@ type LegacySchedulerJob interface {
GetTolerations() []v1.Toleration
GetResourceRequirements() v1.ResourceRequirements
}

func PodRequirementFromLegacySchedulerJob(job LegacySchedulerJob, priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements {
schedulingInfo := job.GetRequirements(priorityClasses)
if schedulingInfo == nil {
return nil
}
for _, objectReq := range schedulingInfo.ObjectRequirements {
if req := objectReq.GetPodRequirements(); req != nil {
return req
}
}
return nil
}
80 changes: 37 additions & 43 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

Expand Down Expand Up @@ -556,59 +557,52 @@ func BindPodToNode(req *schedulerobjects.PodRequirements, node *schedulerobjects
return node, nil
}

// EvictPodFromNode returns a copy of node with req evicted from it. Specifically:
// - The job is marked as evicted on the node.
// - AllocatedByJobId and AllocatedByQueue are not updated.
// - Resources requested by the evicted pod are marked as allocated at priority evictedPriority.
func EvictPodFromNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error) {
jobId, err := JobIdFromPodRequirements(req)
if err != nil {
return nil, err
}
queue, err := QueueFromPodRequirements(req)
if err != nil {
return nil, err
}
node = node.DeepCopy()

// Ensure we track allocated resources at evictedPriority.
if _, ok := node.AllocatableByPriorityAndResource[evictedPriority]; !ok {
pMin := int32(math.MaxInt32)
ok := false
for p := range node.AllocatableByPriorityAndResource {
if p < pMin {
pMin = p
ok = true
}
}
if ok {
node.AllocatableByPriorityAndResource[evictedPriority] = node.AllocatableByPriorityAndResource[pMin].DeepCopy()
}
}

// EvictJobFromNodeInPlace marks the given job as evicted and moves its resources from the job's
// priority to preemptedPriority; it does not update AllocatedByJobId and AllocatedByQueue.
func EvictJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) error {
jobId := job.GetId()
if _, ok := node.AllocatedByJobId[jobId]; !ok {
return nil, errors.Errorf("job %s has no resources allocated on node %s", jobId, node.Id)
return errors.Errorf("job %s has no resources allocated on node %s", jobId, node.Id)
}

queue := job.GetQueue()
if _, ok := node.AllocatedByQueue[queue]; !ok {
return nil, errors.Errorf("queue %s has no resources allocated on node %s", queue, node.Id)
return errors.Errorf("queue %s has no resources allocated on node %s", queue, node.Id)
}

if node.EvictedJobRunIds == nil {
node.EvictedJobRunIds = make(map[string]bool)
}
if _, ok := node.EvictedJobRunIds[jobId]; ok {
// TODO: We're using run ids instead of job ids for now.
return nil, errors.Errorf("job %s is already evicted from node %s", jobId, node.Id)
} else {
node.EvictedJobRunIds[jobId] = true
return errors.Errorf("job %s is already evicted from node %s", jobId, node.Id)
}
node.EvictedJobRunIds[jobId] = true

schedulerobjects.AllocatableByPriorityAndResourceType(
node.AllocatableByPriorityAndResource,
).MarkAllocatableV1ResourceList(req.Priority, req.ResourceRequirements.Requests)
schedulerobjects.AllocatableByPriorityAndResourceType(
node.AllocatableByPriorityAndResource,
).MarkAllocatedV1ResourceList(evictedPriority, req.ResourceRequirements.Requests)
return node, nil
if _, ok := node.AllocatableByPriorityAndResource[evictedPriority]; !ok {
minimumPriority := int32(math.MaxInt32)
foundPriority := false
for p := range node.AllocatableByPriorityAndResource {
if p < minimumPriority {
minimumPriority = p
foundPriority = true
}
}
if foundPriority {
node.AllocatableByPriorityAndResource[evictedPriority] = node.AllocatableByPriorityAndResource[minimumPriority].DeepCopy()
} else {
// We do not expect to hit this branch; however, if we do, then we need
// to make sure that evictedPriority is in this map so that the call to
// MarkAllocatedV1ResourceList() below knows about it.
node.AllocatableByPriorityAndResource[evictedPriority] = schedulerobjects.ResourceList{}
}
}
allocatable := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource)
priority := priorityClasses[job.GetPriorityClassName()].Priority
requests := job.GetResourceRequirements().Requests
allocatable.MarkAllocatableV1ResourceList(priority, requests)
allocatable.MarkAllocatedV1ResourceList(evictedPriority, requests)

return nil
}

// UnbindPodsFromNode returns a node with all reqs unbound from it.
Expand Down
61 changes: 26 additions & 35 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,40 +103,45 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) {
}

func TestNodeBindingEvictionUnbinding(t *testing.T) {
// We add evictedPriority here to simplify the resource accounting checks
// below; it is always present after the first eviction.
node := testfixtures.Test8GpuNode(append(testfixtures.TestPriorities, evictedPriority))
req := testfixtures.N1GpuPodReqs("A", 0, 1)[0]
request := schedulerobjects.ResourceListFromV1ResourceList(req.ResourceRequirements.Requests)
jobId, err := JobIdFromPodRequirements(req)
job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0)
jobId := job.GetId()
priority := testfixtures.TestPriorityClasses[job.GetPriorityClassName()].Priority
schedulingInfo := job.GetRequirements(testfixtures.TestPriorityClasses)
podRequirements := schedulingInfo.ObjectRequirements[0].GetPodRequirements()
requests := schedulerobjects.ResourceListFromV1ResourceList(job.GetResourceRequirements().Requests)

boundNode, err := BindPodToNode(podRequirements, node)
require.NoError(t, err)

boundNode, err := BindPodToNode(req, node)
unboundNode, err := UnbindPodFromNode(podRequirements, boundNode)
require.NoError(t, err)

unboundNode, err := UnbindPodFromNode(req, boundNode)
unboundMultipleNode, err := UnbindPodsFromNode([]*schedulerobjects.PodRequirements{podRequirements}, boundNode)
require.NoError(t, err)

unboundMultipleNode, err := UnbindPodsFromNode([]*schedulerobjects.PodRequirements{req}, boundNode)
evictedNode := boundNode.DeepCopy()
err = EvictJobFromNodeInPlace(testfixtures.TestPriorityClasses, job, evictedNode)
require.NoError(t, err)

evictedNode, err := EvictPodFromNode(req, boundNode)
evictedUnboundNode, err := UnbindPodFromNode(podRequirements, evictedNode)
require.NoError(t, err)

evictedUnboundNode, err := UnbindPodFromNode(req, evictedNode)
evictedBoundNode, err := BindPodToNode(podRequirements, evictedNode)
require.NoError(t, err)

evictedBoundNode, err := BindPodToNode(req, evictedNode)
require.NoError(t, err)

_, err = EvictPodFromNode(req, node)
err = EvictJobFromNodeInPlace(testfixtures.TestPriorityClasses, job, node)
require.Error(t, err)

_, err = UnbindPodFromNode(req, node)
_, err = UnbindPodFromNode(podRequirements, node)
require.Error(t, err)

_, err = BindPodToNode(req, boundNode)
_, err = BindPodToNode(podRequirements, boundNode)
require.Error(t, err)

_, err = EvictPodFromNode(req, evictedNode)
err = EvictJobFromNodeInPlace(testfixtures.TestPriorityClasses, job, evictedNode)
require.Error(t, err)

assertNodeAccountingEqual(t, node, unboundNode)
Expand All @@ -148,36 +153,36 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) {
assert.True(
t,
armadamaps.DeepEqual(
map[string]schedulerobjects.ResourceList{jobId: request},
map[string]schedulerobjects.ResourceList{jobId: requests},
boundNode.AllocatedByJobId,
),
)
assert.True(
t,
armadamaps.DeepEqual(
map[string]schedulerobjects.ResourceList{jobId: request},
map[string]schedulerobjects.ResourceList{jobId: requests},
evictedNode.AllocatedByJobId,
),
)

assert.True(
t,
armadamaps.DeepEqual(
map[string]schedulerobjects.ResourceList{"A": request},
map[string]schedulerobjects.ResourceList{"A": requests},
boundNode.AllocatedByQueue,
),
)
assert.True(
t,
armadamaps.DeepEqual(
map[string]schedulerobjects.ResourceList{"A": request},
map[string]schedulerobjects.ResourceList{"A": requests},
evictedNode.AllocatedByQueue,
),
)

expectedAllocatable := boundNode.TotalResources.DeepCopy()
expectedAllocatable.Sub(request)
assert.True(t, expectedAllocatable.Equal(boundNode.AllocatableByPriorityAndResource[req.Priority]))
expectedAllocatable.Sub(requests)
assert.True(t, expectedAllocatable.Equal(boundNode.AllocatableByPriorityAndResource[priority]))

assert.Empty(t, unboundNode.AllocatedByJobId)
assert.Empty(t, unboundNode.AllocatedByQueue)
Expand Down Expand Up @@ -637,17 +642,3 @@ func randomString(n int) string {
}
return s
}

func GetTestNodeDb() *NodeDb {
nodeDb, err := NewNodeDb(
testfixtures.TestPriorityClasses,
0,
testfixtures.TestResources,
testfixtures.TestIndexedTaints,
testfixtures.TestIndexedNodeLabels,
)
if err != nil {
panic(err)
}
return nodeDb
}
34 changes: 17 additions & 17 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,41 +808,41 @@ func (evi *Evictor) Evict(ctx context.Context, it nodedb.NodeIterator) (*Evictor
evictedJobsById := make(map[string]interfaces.LegacySchedulerJob)
affectedNodesById := make(map[string]*schedulerobjects.Node)
nodeIdByJobId := make(map[string]string)
for node := it.NextNode(); node != nil; node = it.NextNode() {
if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) {
for originalNode := it.NextNode(); originalNode != nil; originalNode = it.NextNode() {
if evi.nodeFilter != nil && !evi.nodeFilter(ctx, originalNode) {
continue
}
jobIds := util.Filter(
maps.Keys(node.AllocatedByJobId),
func(jobId string) bool {
_, ok := node.EvictedJobRunIds[jobId]
return !ok
},
)
jobIds := make([]string, 0)
for jobId := range originalNode.AllocatedByJobId {
if _, ok := originalNode.EvictedJobRunIds[jobId]; !ok {
jobIds = append(jobIds, jobId)
}
}
jobs, err := evi.jobRepo.GetExistingJobsByIds(jobIds)
if err != nil {
return nil, err
}
var affectedNode *schedulerobjects.Node
for _, job := range jobs {
if evi.jobFilter != nil && !evi.jobFilter(ctx, job) {
continue
}
req := PodRequirementFromLegacySchedulerJob(job, evi.priorityClasses)
if req == nil {
continue
if affectedNode == nil {
affectedNode = originalNode.DeepCopy()
}
node, err = nodedb.EvictPodFromNode(req, node)
err = nodedb.EvictJobFromNodeInPlace(evi.priorityClasses, job, affectedNode)
if err != nil {
return nil, err
}
if evi.postEvictFunc != nil {
evi.postEvictFunc(ctx, job, node)
evi.postEvictFunc(ctx, job, affectedNode)
}

evictedJobsById[job.GetId()] = job
nodeIdByJobId[job.GetId()] = node.Id
nodeIdByJobId[job.GetId()] = affectedNode.Id
}
if affectedNode != nil {
affectedNodesById[affectedNode.Id] = affectedNode
}
affectedNodesById[node.Id] = node
}
return &EvictorResult{
EvictedJobsById: evictedJobsById,
Expand Down

0 comments on commit d309461

Please sign in to comment.