diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 305980de463..8f3507bd8fa 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -279,7 +279,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p txn := c.jobDb.ReadTxn() for _, executor := range executors { for _, node := range executor.Nodes { - pool := GetNodePool(node, executor) + pool := node.GetPool() clusterKey := clusterMetricKey{ cluster: executor.Id, pool: pool, diff --git a/internal/scheduler/pool_assigner.go b/internal/scheduler/pool_assigner.go index 1252d96271e..943d727af1f 100644 --- a/internal/scheduler/pool_assigner.go +++ b/internal/scheduler/pool_assigner.go @@ -51,7 +51,7 @@ func (p *DefaultPoolAssigner) Refresh(ctx *armadacontext.Context) error { func (p *DefaultPoolAssigner) AssignPools(j *jobdb.Job) ([]string, error) { // If Job has an active run then use the pool associated with the executor it was assigned to if !j.Queued() && j.HasRuns() { - pool := GetRunPool(j.LatestRun(), p.nodeById[j.LatestRun().NodeId()], p.executorById[j.LatestRun().Executor()]) + pool := j.LatestRun().Pool() return []string{pool}, nil } // otherwise use the pools associated with the job diff --git a/internal/scheduler/pool_assigner_test.go b/internal/scheduler/pool_assigner_test.go index d6e9bbc2af1..9e245550237 100644 --- a/internal/scheduler/pool_assigner_test.go +++ b/internal/scheduler/pool_assigner_test.go @@ -26,10 +26,6 @@ func TestPoolAssigner_AssignPools(t *testing.T) { WithQueued(false). WithNewRun(cpuExecutor.Id, testfixtures.TestCluster()[0].Id, testfixtures.TestCluster()[0].Name, "", 0) - runningJobWithoutPoolSetOnLatestRunOrExistingAssociatedNode := queuedJob. - WithQueued(false). - WithNewRun(cpuExecutor.Id, "unknownNode", "unknownNode", "", 0) - tests := map[string]struct { executorTimout time.Duration executors []*schedulerobjects.Executor @@ -49,15 +45,10 @@ func TestPoolAssigner_AssignPools(t *testing.T) { job: runningJob, expectedPools: []string{"cpu"}, }, - "running job without pool set returns pool of associated node": { + "running job without pool set returns empty string pool": { executors: []*schedulerobjects.Executor{cpuExecutor}, job: runningJobWithoutPoolSetOnLatestRun, - expectedPools: []string{testfixtures.TestPool}, - }, - "running job without pool set returns pool of associated cluster if associated node does not exist": { - executors: []*schedulerobjects.Executor{cpuExecutor}, - job: runningJobWithoutPoolSetOnLatestRunOrExistingAssociatedNode, - expectedPools: []string{cpuExecutor.Pool}, + expectedPools: []string{""}, }, } for name, tc := range tests { diff --git a/internal/scheduler/poolutils.go b/internal/scheduler/poolutils.go deleted file mode 100644 index d5fa1a550d7..00000000000 --- a/internal/scheduler/poolutils.go +++ /dev/null @@ -1,39 +0,0 @@ -package scheduler - -import ( - log "github.com/sirupsen/logrus" - - "github.com/armadaproject/armada/internal/scheduler/jobdb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" -) - -const DefaultPool = "default" - -// TODO Remove this and just use node.GetPool() once we have migrated to have all nodes have pool set -// We may still want to keep a "fallback" pool, which may be a statically defined default pool or remain at executor level -func GetNodePool(node *schedulerobjects.Node, executor *schedulerobjects.Executor) string { - if node != nil { - if node.GetPool() != "" { - return node.GetPool() - } else { - log.Warnf("node %s does not have a pool set, defaulting to cluster pool", node.Id) - } - } - - if executor == nil { - return DefaultPool - } - if executor.GetPool() == "" { - log.Errorf("executor %s has no pool set", executor.Id) - return DefaultPool - } - return executor.GetPool() -} - -// TODO Remove this and just use run.Pool() once we have migrated to have all runs have node pool set -func GetRunPool(run *jobdb.JobRun, node *schedulerobjects.Node, executor *schedulerobjects.Executor) string { - if run != nil && run.Pool() != "" { - return run.Pool() - } - return GetNodePool(node, executor) -} diff --git a/internal/scheduler/poolutils_test.go b/internal/scheduler/poolutils_test.go deleted file mode 100644 index 1809833847b..00000000000 --- a/internal/scheduler/poolutils_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package scheduler - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/utils/pointer" - - "github.com/armadaproject/armada/internal/scheduler/jobdb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" - "github.com/armadaproject/armada/internal/scheduler/testfixtures" -) - -var ( - nodeWithPool = &schedulerobjects.Node{Pool: "node-pool"} - nodeWithoutPool = &schedulerobjects.Node{} - executorWithPool = &schedulerobjects.Executor{Pool: "executor-pool"} - executorWithoutPool = &schedulerobjects.Executor{} - - runWithPool = testfixtures.JobDb.CreateRun( - "", - queuedJob.Id(), - 123, - "test-executor", - "test-executor-test-node", - "test-node", - "run-pool", - pointer.Int32(5), - false, - false, - false, - false, - false, - false, - false, - false, - nil, - nil, - nil, - nil, - nil, - false, - false, - ) - runWithoutPool = &jobdb.JobRun{} -) - -func TestGetNodePool(t *testing.T) { - assert.Equal(t, "node-pool", GetNodePool(nodeWithPool, executorWithPool)) - assert.Equal(t, "executor-pool", GetNodePool(nodeWithoutPool, executorWithPool)) - assert.Equal(t, DefaultPool, GetNodePool(nodeWithoutPool, executorWithoutPool)) -} - -func TestGetNodePool_NilInputs(t *testing.T) { - assert.Equal(t, "node-pool", GetNodePool(nodeWithPool, nil)) - assert.Equal(t, "executor-pool", GetNodePool(nil, executorWithPool)) - assert.Equal(t, DefaultPool, GetNodePool(nil, nil)) -} - -func TestGetRunPool(t *testing.T) { - assert.Equal(t, "run-pool", GetRunPool(runWithPool, nodeWithPool, executorWithPool)) - assert.Equal(t, "node-pool", GetRunPool(runWithoutPool, nodeWithPool, executorWithPool)) - assert.Equal(t, "executor-pool", GetRunPool(runWithoutPool, nodeWithoutPool, executorWithPool)) - assert.Equal(t, DefaultPool, GetRunPool(runWithoutPool, nodeWithoutPool, executorWithoutPool)) -} - -func TestGetRunPool_NilInputs(t *testing.T) { - assert.Equal(t, "run-pool", GetRunPool(runWithPool, nil, nil)) - assert.Equal(t, "node-pool", GetRunPool(nil, nodeWithPool, executorWithPool)) - assert.Equal(t, "executor-pool", GetRunPool(nil, nil, executorWithPool)) - assert.Equal(t, DefaultPool, GetRunPool(nil, nil, nil)) -} diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index f670c8c8208..a463b54a83e 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -239,7 +239,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con executorById[executor.Id] = executor for _, node := range executor.Nodes { nodeById[node.GetId()] = node - pool := GetNodePool(node, executor) + pool := node.GetPool() allKnownPools[pool] = true if _, present := nodesByPoolAndExecutor[pool]; !present { @@ -273,7 +273,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con totalCapacityByPool := make(schedulerobjects.QuantityByTAndResourceType[string]) for _, executor := range executors { for _, node := range executor.Nodes { - totalCapacityByPool.AddResourceList(GetNodePool(node, executor), node.TotalResources) + totalCapacityByPool.AddResourceList(node.GetPool(), node.TotalResources) } } @@ -300,7 +300,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con pools := job.Pools() if !job.Queued() && job.LatestRun() != nil { - pool := GetRunPool(job.LatestRun(), nodeById[job.LatestRun().NodeId()], executorById[job.LatestRun().Executor()]) + pool := job.LatestRun().Pool() pools = []string{pool} } else if len(pools) < 1 { // This occurs if we haven't assigned a job to a pool. Right now this can occur if a user @@ -349,7 +349,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con if nodeName := run.NodeName(); nodeName == "" { return nil, errors.Errorf("run %s of job %s is not queued but has no nodeName associated with it", run.Id(), job.Id()) } - pool := GetRunPool(job.LatestRun(), nodeById[job.LatestRun().NodeId()], executorById[job.LatestRun().Executor()]) + pool := job.LatestRun().Pool() if _, present := jobsByPoolAndExecutor[pool]; !present { jobsByPoolAndExecutor[pool] = map[string][]*jobdb.Job{} } diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index 519416b2612..178d5c190d3 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -95,7 +95,7 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) { for _, ex := range executors { nodes := ex.GetNodes() nodesByPool := armadaslices.GroupByFunc(nodes, func(n *schedulerobjects.Node) string { - return GetNodePool(n, ex) + return n.GetPool() }) for pool, nodes := range nodesByPool { nodeDb, err := srv.constructNodeDb(nodes)