Skip to content

Commit

Permalink
Remove assertions from preempting_queue_scheduler.go (#3959)
Browse files Browse the repository at this point in the history
* Remove assertions from preempting_queue_scheduler.go

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* Remove NodeJobDiff + iterators that are now unused

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

---------

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
Co-authored-by: JamesMurkin <jamesmurkin@hotmail.com>
  • Loading branch information
d80tb7 and JamesMurkin committed Sep 23, 2024
1 parent 977628c commit c3d5faf
Show file tree
Hide file tree
Showing 6 changed files with 0 additions and 267 deletions.
41 changes: 0 additions & 41 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,47 +401,6 @@ func (nodeDb *NodeDb) GetNodeWithTxn(txn *memdb.Txn, id string) (*internaltypes.
return obj.(*internaltypes.Node), nil
}

// NodeJobDiff compares two snapshots of the NodeDb memdb and returns
// - a map from job ids of all preempted jobs to the node they used to be on
// - a map from job ids of all scheduled jobs to the node they were scheduled on
// that happened between the two snapshots.
func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*internaltypes.Node, map[string]*internaltypes.Node, error) {
preempted := make(map[string]*internaltypes.Node)
scheduled := make(map[string]*internaltypes.Node)
nodePairIterator, err := NewNodePairIterator(txnA, txnB)
if err != nil {
return nil, nil, err
}
for item := nodePairIterator.NextItem(); item != nil; item = nodePairIterator.NextItem() {
if item.NodeA != nil && item.NodeB == nil {
// NodeA was removed. All jobs on NodeA are preempted.
for jobId := range item.NodeA.AllocatedByJobId {
preempted[jobId] = item.NodeA
}
} else if item.NodeA == nil && item.NodeB != nil {
// NodeB was added. All jobs on NodeB are scheduled.
for jobId := range item.NodeB.AllocatedByJobId {
scheduled[jobId] = item.NodeB
}
} else if item.NodeA != nil && item.NodeB != nil {
// NodeA is the same as NodeB.
// Jobs on NodeA that are not on NodeB are preempted.
// Jobs on NodeB that are not on NodeA are scheduled.
for jobId := range item.NodeA.AllocatedByJobId {
if _, ok := item.NodeB.AllocatedByJobId[jobId]; !ok {
preempted[jobId] = item.NodeA
}
}
for jobId := range item.NodeB.AllocatedByJobId {
if _, ok := item.NodeA.AllocatedByJobId[jobId]; !ok {
scheduled[jobId] = item.NodeB
}
}
}
}
return preempted, scheduled, nil
}

func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, gctx *schedulercontext.GangSchedulingContext) (bool, error) {
// Attempt to schedule pods one by one in a transaction.
for _, jctx := range gctx.JobSchedulingContexts {
Expand Down
78 changes: 0 additions & 78 deletions internal/scheduler/nodedb/nodeiteration.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,84 +47,6 @@ func (it *NodesIterator) Next() interface{} {
return it.NextNode()
}

type NodePairIterator struct {
itA *NodesIterator
itB *NodesIterator
nodeA *internaltypes.Node
nodeB *internaltypes.Node
}

type NodePairIteratorItem struct {
NodeA *internaltypes.Node
NodeB *internaltypes.Node
}

func NewNodePairIterator(txnA, txnB *memdb.Txn) (*NodePairIterator, error) {
itA, err := NewNodesIterator(txnA)
if err != nil {
return nil, errors.WithStack(err)
}
itB, err := NewNodesIterator(txnB)
if err != nil {
return nil, errors.WithStack(err)
}
return &NodePairIterator{
itA: itA,
itB: itB,
}, nil
}

func (it *NodePairIterator) WatchCh() <-chan struct{} {
panic("not implemented")
}

func (it *NodePairIterator) NextItem() (rv *NodePairIteratorItem) {
defer func() {
if rv == nil {
return
}
if rv.NodeA != nil {
it.nodeA = nil
}
if rv.NodeB != nil {
it.nodeB = nil
}
}()
if it.nodeA == nil {
it.nodeA = it.itA.NextNode()
}
if it.nodeB == nil {
it.nodeB = it.itB.NextNode()
}
if it.nodeA == nil && it.nodeB == nil {
return nil
} else if it.nodeA == nil || it.nodeB == nil {
return &NodePairIteratorItem{
NodeA: it.nodeA,
NodeB: it.nodeB,
}
}
cmp := bytes.Compare([]byte(it.nodeA.GetId()), []byte(it.nodeB.GetId()))
if cmp == 0 {
return &NodePairIteratorItem{
NodeA: it.nodeA,
NodeB: it.nodeB,
}
} else if cmp == -1 {
return &NodePairIteratorItem{
NodeA: it.nodeA,
}
} else {
return &NodePairIteratorItem{
NodeB: it.nodeB,
}
}
}

func (it *NodePairIterator) Next() interface{} {
return it.NextItem()
}

// NodeIndex is an index for internaltypes.Node that returns node.NodeDbKeys[KeyIndex].
type NodeIndex struct {
KeyIndex int
Expand Down
49 changes: 0 additions & 49 deletions internal/scheduler/nodedb/nodeiteration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,55 +71,6 @@ func TestNodesIterator(t *testing.T) {
}
}

func TestNodePairIterator(t *testing.T) {
nodes := testfixtures.TestCluster()
for i, nodeId := range []string{"A", "B", "C"} {
nodes[i].Id = nodeId
}
nodeDb, err := newNodeDbWithNodes(nodes)
require.NoError(t, err)
entries := make([]*internaltypes.Node, len(nodes))
for i, node := range nodes {
entry, err := nodeDb.GetNode(node.Id)
require.NoError(t, err)
entries[i] = entry
}

txn := nodeDb.Txn(true)
require.NoError(t, txn.Delete("nodes", entries[2]))
txn.Commit()
txnA := nodeDb.Txn(false)

txn = nodeDb.Txn(true)
require.NoError(t, txn.Delete("nodes", entries[0]))
require.NoError(t, txn.Insert("nodes", entries[2]))
txn.Commit()
txnB := nodeDb.Txn(false)

it, err := NewNodePairIterator(txnA, txnB)
require.NoError(t, err)

actual := make([]*NodePairIteratorItem, 0)
for item := it.NextItem(); item != nil; item = it.NextItem() {
actual = append(actual, item)
}
expected := []*NodePairIteratorItem{
{
NodeA: entries[0],
NodeB: nil,
},
{
NodeA: entries[1],
NodeB: entries[1],
},
{
NodeA: nil,
NodeB: entries[2],
},
}
assert.Equal(t, expected, actual)
}

func TestNodeTypeIterator(t *testing.T) {
const nodeTypeALabel = "a"
const nodeTypeBLabel = "b"
Expand Down
95 changes: 0 additions & 95 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ type PreemptingQueueScheduler struct {
gangIdByJobId map[string]string
// If true, the unsuccessfulSchedulingKeys check of gangScheduler is omitted.
skipUnsuccessfulSchedulingKeyCheck bool
// If true, asserts that the nodeDb state is consistent with expected changes.
enableAssertions bool
}

func NewPreemptingQueueScheduler(
Expand Down Expand Up @@ -85,10 +83,6 @@ func NewPreemptingQueueScheduler(
}
}

func (sch *PreemptingQueueScheduler) EnableAssertions() {
sch.enableAssertions = true
}

func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
sch.skipUnsuccessfulSchedulingKeyCheck = true
}
Expand All @@ -104,10 +98,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
preemptedJobsById := make(map[string]*schedulercontext.JobSchedulingContext)
scheduledJobsById := make(map[string]*schedulercontext.JobSchedulingContext)

// NodeDb snapshot prior to making any changes.
// We compare against this snapshot after scheduling to detect changes.
snapshot := sch.nodeDb.Txn(false)

// Evict preemptible jobs.
ctx.WithField("stage", "scheduling-algo").Infof("Evicting preemptible jobs")
evictorResult, inMemoryJobRepo, err := sch.evict(
Expand Down Expand Up @@ -240,19 +230,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche
schedulercontext.PrintJobSummary(ctx, "Scheduling new jobs;", scheduledJobs)
// TODO: Show failed jobs.

if sch.enableAssertions {
ctx.WithField("stage", "scheduling-algo").Infof("Performing assertions after scheduling round")
err := sch.assertions(
snapshot,
preemptedJobsById,
scheduledJobsById,
sch.nodeIdByJobId,
)
if err != nil {
return nil, err
}
ctx.WithField("stage", "scheduling-algo").Infof("Finished running assertions after scheduling round")
}
return &schedulerresult.SchedulerResult{
PreemptedJobs: preemptedJobs,
ScheduledJobs: scheduledJobs,
Expand Down Expand Up @@ -612,78 +589,6 @@ func (sch *PreemptingQueueScheduler) updateGangAccounting(preempted []*scheduler
return nil
}

// For each node in the NodeDb, compare assigned jobs relative to the initial snapshot.
// Jobs no longer assigned to a node are preemtped.
// Jobs assigned to a node that weren't earlier are scheduled.
//
// Compare the nodedb.NodeJobDiff with expected preempted/scheduled jobs to ensure NodeDb is consistent.
// This is only to validate that nothing unexpected happened during scheduling.
func (sch *PreemptingQueueScheduler) assertions(
snapshot *memdb.Txn,
preemptedJobsById map[string]*schedulercontext.JobSchedulingContext,
scheduledJobsById map[string]*schedulercontext.JobSchedulingContext,
nodeIdByJobId map[string]string,
) error {
// Compare two snapshots of the nodeDb to find jobs that
// were preempted/scheduled between creating the snapshots.
preempted, scheduled, err := nodedb.NodeJobDiff(snapshot, sch.nodeDb.Txn(false))
if err != nil {
return err
}

// Assert that jobs we expect to be preempted/scheduled are marked as such in the nodeDb.
for jobId := range preemptedJobsById {
if _, ok := preempted[jobId]; !ok {
return errors.Errorf("inconsistent NodeDb: expected job %s to be preempted in nodeDb", jobId)
}
}
for jobId := range scheduledJobsById {
if _, ok := scheduled[jobId]; !ok {
return errors.Errorf("inconsistent NodeDb: expected job %s to be scheduled in nodeDb", jobId)
}
}

// Assert that jobs marked as preempted (scheduled) in the nodeDb are expected to be preempted (scheduled),
// and that jobs are preempted/scheduled on the nodes we expect them to.
for jobId, node := range preempted {
if expectedNodeId, ok := nodeIdByJobId[jobId]; ok {
if expectedNodeId != node.GetId() {
return errors.Errorf(
"inconsistent NodeDb: expected job %s to be preempted from node %s, but got %s",
jobId, expectedNodeId, node.GetId(),
)
}
} else {
return errors.Errorf(
"inconsistent NodeDb: expected job %s to be mapped to node %s, but found none",
jobId, node.GetId(),
)
}
if _, ok := preemptedJobsById[jobId]; !ok {
return errors.Errorf("inconsistent NodeDb: didn't expect job %s to be preempted (job marked as preempted in NodeDb)", jobId)
}
}
for jobId, node := range scheduled {
if expectedNodeId, ok := nodeIdByJobId[jobId]; ok {
if expectedNodeId != node.GetId() {
return errors.Errorf(
"inconsistent NodeDb: expected job %s to be on node %s, but got %s",
jobId, expectedNodeId, node.GetId(),
)
}
} else {
return errors.Errorf(
"inconsistent NodeDb: expected job %s to be mapped to node %s, but found none",
jobId, node.GetId(),
)
}
if _, ok := scheduledJobsById[jobId]; !ok {
return errors.Errorf("inconsistent NodeDb: didn't expect job %s to be scheduled (job marked as scheduled in NodeDb)", jobId)
}
}
return nil
}

type Evictor struct {
jobRepo JobRepository
nodeDb *nodedb.NodeDb
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,6 @@ func TestPreemptingQueueScheduler(t *testing.T) {
jobIdsByGangId,
gangIdByJobId,
)
sch.EnableAssertions()

result, err := sch.Schedule(ctx)
require.NoError(t, err)
Expand Down
3 changes: 0 additions & 3 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,6 @@ func (l *FairSchedulingAlgo) schedulePool(
if l.schedulingConfig.AlwaysAttemptScheduling {
scheduler.SkipUnsuccessfulSchedulingKeyCheck()
}
if l.schedulingConfig.EnableAssertions {
scheduler.EnableAssertions()
}

result, err := scheduler.Schedule(ctx)
if err != nil {
Expand Down

0 comments on commit c3d5faf

Please sign in to comment.