From c3d5faf5b3a9e09e214c41d51cf342c809353f23 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Mon, 23 Sep 2024 12:23:24 +0100 Subject: [PATCH] Remove assertions from preempting_queue_scheduler.go (#3959) * Remove assertions from preempting_queue_scheduler.go Signed-off-by: Chris Martin * Remove NodeJobDiff + iterators that are now unused Signed-off-by: JamesMurkin --------- Signed-off-by: Chris Martin Signed-off-by: JamesMurkin Co-authored-by: JamesMurkin --- internal/scheduler/nodedb/nodedb.go | 41 -------- internal/scheduler/nodedb/nodeiteration.go | 78 --------------- .../scheduler/nodedb/nodeiteration_test.go | 49 ---------- .../scheduler/preempting_queue_scheduler.go | 95 ------------------- .../preempting_queue_scheduler_test.go | 1 - internal/scheduler/scheduling_algo.go | 3 - 6 files changed, 267 deletions(-) diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index d0d8a9b90d3..ff7d12fcfa7 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -401,47 +401,6 @@ func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*internaltypes. return obj.(*internaltypes.Node), nil } -// NodeJobDiff compares two snapshots of the NodeDb memdb and returns -// - a map from job ids of all preempted jobs to the node they used to be on -// - a map from job ids of all scheduled jobs to the node they were scheduled on -// that happened between the two snapshots. -func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*internaltypes.Node, map[string]*internaltypes.Node, error) { - preempted := make(map[string]*internaltypes.Node) - scheduled := make(map[string]*internaltypes.Node) - nodePairIterator, err := NewNodePairIterator(txnA, txnB) - if err != nil { - return nil, nil, err - } - for item := nodePairIterator.NextItem(); item != nil; item = nodePairIterator.NextItem() { - if item.NodeA != nil && item.NodeB == nil { - // NodeA was removed. All jobs on NodeA are preempted. - for jobId := range item.NodeA.AllocatedByJobId { - preempted[jobId] = item.NodeA - } - } else if item.NodeA == nil && item.NodeB != nil { - // NodeB was added. All jobs on NodeB are scheduled. - for jobId := range item.NodeB.AllocatedByJobId { - scheduled[jobId] = item.NodeB - } - } else if item.NodeA != nil && item.NodeB != nil { - // NodeA is the same as NodeB. - // Jobs on NodeA that are not on NodeB are preempted. - // Jobs on NodeB that are not on NodeA are scheduled. - for jobId := range item.NodeA.AllocatedByJobId { - if _, ok := item.NodeB.AllocatedByJobId[jobId]; !ok { - preempted[jobId] = item.NodeA - } - } - for jobId := range item.NodeB.AllocatedByJobId { - if _, ok := item.NodeA.AllocatedByJobId[jobId]; !ok { - scheduled[jobId] = item.NodeB - } - } - } - } - return preempted, scheduled, nil -} - func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, gctx *schedulercontext.GangSchedulingContext) (bool, error) { // Attempt to schedule pods one by one in a transaction. for _, jctx := range gctx.JobSchedulingContexts { diff --git a/internal/scheduler/nodedb/nodeiteration.go b/internal/scheduler/nodedb/nodeiteration.go index 77a0cae80dc..3e171007139 100644 --- a/internal/scheduler/nodedb/nodeiteration.go +++ b/internal/scheduler/nodedb/nodeiteration.go @@ -47,84 +47,6 @@ func (it *NodesIterator) Next() interface{} { return it.NextNode() } -type NodePairIterator struct { - itA *NodesIterator - itB *NodesIterator - nodeA *internaltypes.Node - nodeB *internaltypes.Node -} - -type NodePairIteratorItem struct { - NodeA *internaltypes.Node - NodeB *internaltypes.Node -} - -func NewNodePairIterator(txnA, txnB *memdb.Txn) (*NodePairIterator, error) { - itA, err := NewNodesIterator(txnA) - if err != nil { - return nil, errors.WithStack(err) - } - itB, err := NewNodesIterator(txnB) - if err != nil { - return nil, errors.WithStack(err) - } - return &NodePairIterator{ - itA: itA, - itB: itB, - }, nil -} - -func (it *NodePairIterator) WatchCh() <-chan struct{} { - panic("not implemented") -} - -func (it *NodePairIterator) NextItem() (rv *NodePairIteratorItem) { - defer func() { - if rv == nil { - return - } - if rv.NodeA != nil { - it.nodeA = nil - } - if rv.NodeB != nil { - it.nodeB = nil - } - }() - if it.nodeA == nil { - it.nodeA = it.itA.NextNode() - } - if it.nodeB == nil { - it.nodeB = it.itB.NextNode() - } - if it.nodeA == nil && it.nodeB == nil { - return nil - } else if it.nodeA == nil || it.nodeB == nil { - return &NodePairIteratorItem{ - NodeA: it.nodeA, - NodeB: it.nodeB, - } - } - cmp := bytes.Compare([]byte(it.nodeA.GetId()), []byte(it.nodeB.GetId())) - if cmp == 0 { - return &NodePairIteratorItem{ - NodeA: it.nodeA, - NodeB: it.nodeB, - } - } else if cmp == -1 { - return &NodePairIteratorItem{ - NodeA: it.nodeA, - } - } else { - return &NodePairIteratorItem{ - NodeB: it.nodeB, - } - } -} - -func (it *NodePairIterator) Next() interface{} { - return it.NextItem() -} - // NodeIndex is an index for internaltypes.Node that returns node.NodeDbKeys[KeyIndex]. type NodeIndex struct { KeyIndex int diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index 2355a81b563..53580e8489f 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -71,55 +71,6 @@ func TestNodesIterator(t *testing.T) { } } -func TestNodePairIterator(t *testing.T) { - nodes := testfixtures.TestCluster() - for i, nodeId := range []string{"A", "B", "C"} { - nodes[i].Id = nodeId - } - nodeDb, err := newNodeDbWithNodes(nodes) - require.NoError(t, err) - entries := make([]*internaltypes.Node, len(nodes)) - for i, node := range nodes { - entry, err := nodeDb.GetNode(node.Id) - require.NoError(t, err) - entries[i] = entry - } - - txn := nodeDb.Txn(true) - require.NoError(t, txn.Delete("nodes", entries[2])) - txn.Commit() - txnA := nodeDb.Txn(false) - - txn = nodeDb.Txn(true) - require.NoError(t, txn.Delete("nodes", entries[0])) - require.NoError(t, txn.Insert("nodes", entries[2])) - txn.Commit() - txnB := nodeDb.Txn(false) - - it, err := NewNodePairIterator(txnA, txnB) - require.NoError(t, err) - - actual := make([]*NodePairIteratorItem, 0) - for item := it.NextItem(); item != nil; item = it.NextItem() { - actual = append(actual, item) - } - expected := []*NodePairIteratorItem{ - { - NodeA: entries[0], - NodeB: nil, - }, - { - NodeA: entries[1], - NodeB: entries[1], - }, - { - NodeA: nil, - NodeB: entries[2], - }, - } - assert.Equal(t, expected, actual) -} - func TestNodeTypeIterator(t *testing.T) { const nodeTypeALabel = "a" const nodeTypeBLabel = "b" diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 695e728957b..fc234397fa4 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -44,8 +44,6 @@ type PreemptingQueueScheduler struct { gangIdByJobId map[string]string // If true, the unsuccessfulSchedulingKeys check of gangScheduler is omitted. skipUnsuccessfulSchedulingKeyCheck bool - // If true, asserts that the nodeDb state is consistent with expected changes. - enableAssertions bool } func NewPreemptingQueueScheduler( @@ -85,10 +83,6 @@ func NewPreemptingQueueScheduler( } } -func (sch *PreemptingQueueScheduler) EnableAssertions() { - sch.enableAssertions = true -} - func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() { sch.skipUnsuccessfulSchedulingKeyCheck = true } @@ -104,10 +98,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche preemptedJobsById := make(map[string]*schedulercontext.JobSchedulingContext) scheduledJobsById := make(map[string]*schedulercontext.JobSchedulingContext) - // NodeDb snapshot prior to making any changes. - // We compare against this snapshot after scheduling to detect changes. - snapshot := sch.nodeDb.Txn(false) - // Evict preemptible jobs. ctx.WithField("stage", "scheduling-algo").Infof("Evicting preemptible jobs") evictorResult, inMemoryJobRepo, err := sch.evict( @@ -240,19 +230,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche schedulercontext.PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs) // TODO: Show failed jobs. - if sch.enableAssertions { - ctx.WithField("stage", "scheduling-algo").Infof("Performing assertions after scheduling round") - err := sch.assertions( - snapshot, - preemptedJobsById, - scheduledJobsById, - sch.nodeIdByJobId, - ) - if err != nil { - return nil, err - } - ctx.WithField("stage", "scheduling-algo").Infof("Finished running assertions after scheduling round") - } return &schedulerresult.SchedulerResult{ PreemptedJobs: preemptedJobs, ScheduledJobs: scheduledJobs, @@ -612,78 +589,6 @@ func (sch *PreemptingQueueScheduler) updateGangAccounting(preempted []*scheduler return nil } -// For each node in the NodeDb, compare assigned jobs relative to the initial snapshot. -// Jobs no longer assigned to a node are preemtped. -// Jobs assigned to a node that weren't earlier are scheduled. -// -// Compare the nodedb.NodeJobDiff with expected preempted/scheduled jobs to ensure NodeDb is consistent. -// This is only to validate that nothing unexpected happened during scheduling. -func (sch *PreemptingQueueScheduler) assertions( - snapshot *memdb.Txn, - preemptedJobsById map[string]*schedulercontext.JobSchedulingContext, - scheduledJobsById map[string]*schedulercontext.JobSchedulingContext, - nodeIdByJobId map[string]string, -) error { - // Compare two snapshots of the nodeDb to find jobs that - // were preempted/scheduled between creating the snapshots. - preempted, scheduled, err := nodedb.NodeJobDiff(snapshot, sch.nodeDb.Txn(false)) - if err != nil { - return err - } - - // Assert that jobs we expect to be preempted/scheduled are marked as such in the nodeDb. - for jobId := range preemptedJobsById { - if _, ok := preempted[jobId]; !ok { - return errors.Errorf("inconsistent NodeDb: expected job %s to be preempted in nodeDb", jobId) - } - } - for jobId := range scheduledJobsById { - if _, ok := scheduled[jobId]; !ok { - return errors.Errorf("inconsistent NodeDb: expected job %s to be scheduled in nodeDb", jobId) - } - } - - // Assert that jobs marked as preempted (scheduled) in the nodeDb are expected to be preempted (scheduled), - // and that jobs are preempted/scheduled on the nodes we expect them to. - for jobId, node := range preempted { - if expectedNodeId, ok := nodeIdByJobId[jobId]; ok { - if expectedNodeId != node.GetId() { - return errors.Errorf( - "inconsistent NodeDb: expected job %s to be preempted from node %s, but got %s", - jobId, expectedNodeId, node.GetId(), - ) - } - } else { - return errors.Errorf( - "inconsistent NodeDb: expected job %s to be mapped to node %s, but found none", - jobId, node.GetId(), - ) - } - if _, ok := preemptedJobsById[jobId]; !ok { - return errors.Errorf("inconsistent NodeDb: didn't expect job %s to be preempted (job marked as preempted in NodeDb)", jobId) - } - } - for jobId, node := range scheduled { - if expectedNodeId, ok := nodeIdByJobId[jobId]; ok { - if expectedNodeId != node.GetId() { - return errors.Errorf( - "inconsistent NodeDb: expected job %s to be on node %s, but got %s", - jobId, expectedNodeId, node.GetId(), - ) - } - } else { - return errors.Errorf( - "inconsistent NodeDb: expected job %s to be mapped to node %s, but found none", - jobId, node.GetId(), - ) - } - if _, ok := scheduledJobsById[jobId]; !ok { - return errors.Errorf("inconsistent NodeDb: didn't expect job %s to be scheduled (job marked as scheduled in NodeDb)", jobId) - } - } - return nil -} - type Evictor struct { jobRepo JobRepository nodeDb *nodedb.NodeDb diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index c738b0c1512..f7acf329ca3 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1868,7 +1868,6 @@ func TestPreemptingQueueScheduler(t *testing.T) { jobIdsByGangId, gangIdByJobId, ) - sch.EnableAssertions() result, err := sch.Schedule(ctx) require.NoError(t, err) diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 3457af313f9..4b9941a7e18 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -487,9 +487,6 @@ func (l *FairSchedulingAlgo) schedulePool( if l.schedulingConfig.AlwaysAttemptScheduling { scheduler.SkipUnsuccessfulSchedulingKeyCheck() } - if l.schedulingConfig.EnableAssertions { - scheduler.EnableAssertions() - } result, err := scheduler.Schedule(ctx) if err != nil {