Skip to content

Commit

Permalink
Remove Legacy Scheduler Job (#3557)
Browse files Browse the repository at this point in the history
* remove LegacySchedulerJob code (#129)

* wip

* remove legacySchedulerJob

* wip

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* wip

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* fix tests

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* lint

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* renamed methods

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* doc

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* remove  line

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

---------

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Christopher Martin <Chris.Martin@gresearch.co.uk>
Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
3 people committed May 2, 2024
1 parent f18d0fe commit 586efd5
Show file tree
Hide file tree
Showing 34 changed files with 387 additions and 604 deletions.
9 changes: 7 additions & 2 deletions internal/armada/submit/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,20 @@ type jobAdapter struct {
*api.JobSubmitRequestItem
}

// GetPriorityClassName is needed to fulfil the MinimalJob interface
func (j jobAdapter) GetPriorityClassName() string {
// PriorityClassName is needed to fulfil the MinimalJob interface
func (j jobAdapter) PriorityClassName() string {
podSpec := j.GetMainPodSpec()
if podSpec != nil {
return j.GetMainPodSpec().PriorityClassName
}
return ""
}

// Annotations is needed to fulfil the MinimalJob interface
func (j jobAdapter) Annotations() map[string]string {
return j.GetAnnotations()
}

// Ensures that any gang jobs defined in the request are consistent. This checks that all jobs in the same gang have
// the same:
// - Cardinality
Expand Down
2 changes: 1 addition & 1 deletion internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func LogSubmitJobFromApiJob(job *api.Job) (*armadaevents.SubmitJob, error) {
Message: "Both PodSpec and PodSpecs are set",
})
}
jobId, err := armadaevents.ProtoUuidFromUlidString(job.GetId())
jobId, err := armadaevents.ProtoUuidFromUlidString(job.Id)
if err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

Expand All @@ -22,34 +22,34 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*schedul
jobsByQueue := armadaslices.MapAndGroupByFuncs(
jctxs,
func(jctx *schedulercontext.JobSchedulingContext) string {
return jctx.Job.GetQueue()
return jctx.Job.Queue()
},
func(jctx *schedulercontext.JobSchedulingContext) interfaces.LegacySchedulerJob {
func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job {
return jctx.Job
},
)
resourcesByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []interfaces.LegacySchedulerJob) schedulerobjects.ResourceList {
func(jobs []*jobdb.Job) schedulerobjects.ResourceList {
rv := schedulerobjects.NewResourceListWithDefaultSize()
for _, job := range jobs {
rv.AddV1ResourceList(job.GetResourceRequirements().Requests)
rv.AddV1ResourceList(job.ResourceRequirements().Requests)
}
return rv
},
)
jobCountPerQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []interfaces.LegacySchedulerJob) int {
func(jobs []*jobdb.Job) int {
return len(jobs)
},
)
jobIdsByQueue := armadamaps.MapValues(
jobsByQueue,
func(jobs []interfaces.LegacySchedulerJob) []string {
func(jobs []*jobdb.Job) []string {
rv := make([]string, len(jobs))
for i, job := range jobs {
rv[i] = job.GetId()
rv[i] = job.Id()
}
return rv
},
Expand Down
67 changes: 34 additions & 33 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/scheduler/fairness"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

Expand Down Expand Up @@ -240,9 +241,9 @@ func (sctx *SchedulingContext) AddGangSchedulingContext(gctx *GangSchedulingCont
// AddJobSchedulingContext adds a job scheduling context.
// Automatically updates scheduled resources.
func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContext) (bool, error) {
qctx, ok := sctx.QueueSchedulingContexts[jctx.Job.GetQueue()]
qctx, ok := sctx.QueueSchedulingContexts[jctx.Job.Queue()]
if !ok {
return false, errors.Errorf("failed adding job %s to scheduling context: no context for queue %s", jctx.JobId, jctx.Job.GetQueue())
return false, errors.Errorf("failed adding job %s to scheduling context: no context for queue %s", jctx.JobId, jctx.Job.Queue())
}
evictedInThisRound, err := qctx.AddJobSchedulingContext(jctx)
if err != nil {
Expand All @@ -251,18 +252,18 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex
if jctx.IsSuccessful() {
if evictedInThisRound {
sctx.EvictedResources.SubV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.NumEvictedJobs--
} else {
sctx.ScheduledResources.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.NumScheduledJobs++
}
}
return evictedInThisRound, nil
}

func (sctx *SchedulingContext) EvictGang(jobs []interfaces.LegacySchedulerJob) (bool, error) {
func (sctx *SchedulingContext) EvictGang(jobs []*jobdb.Job) (bool, error) {
allJobsScheduledInThisRound := true
for _, job := range jobs {
scheduledInThisRound, err := sctx.EvictJob(job)
Expand All @@ -277,23 +278,23 @@ func (sctx *SchedulingContext) EvictGang(jobs []interfaces.LegacySchedulerJob) (
return allJobsScheduledInThisRound, nil
}

func (sctx *SchedulingContext) EvictJob(job interfaces.LegacySchedulerJob) (bool, error) {
qctx, ok := sctx.QueueSchedulingContexts[job.GetQueue()]
func (sctx *SchedulingContext) EvictJob(job *jobdb.Job) (bool, error) {
qctx, ok := sctx.QueueSchedulingContexts[job.Queue()]
if !ok {
return false, errors.Errorf("failed evicting job %s from scheduling context: no context for queue %s", job.GetId(), job.GetQueue())
return false, errors.Errorf("failed evicting job %s from scheduling context: no context for queue %s", job.Id(), job.Queue())
}
scheduledInThisRound, err := qctx.EvictJob(job)
if err != nil {
return false, err
}
rl := job.GetResourceRequirements().Requests
rl := job.ResourceRequirements().Requests
if scheduledInThisRound {
sctx.ScheduledResources.SubV1ResourceList(rl)
sctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.GetPriorityClassName(), rl)
sctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.PriorityClassName(), rl)
sctx.NumScheduledJobs--
} else {
sctx.EvictedResources.AddV1ResourceList(rl)
sctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.GetPriorityClassName(), rl)
sctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.PriorityClassName(), rl)
sctx.NumEvictedJobs++
}
return scheduledInThisRound, nil
Expand Down Expand Up @@ -481,42 +482,42 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC
// Always update ResourcesByPriority.
// Since ResourcesByPriority is used to order queues by fraction of fair share.
qctx.Allocated.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)

// Only if the job is not evicted, update ScheduledResourcesByPriority.
// Since ScheduledResourcesByPriority is used to control per-round scheduling constraints.
if evictedInThisRound {
delete(qctx.EvictedJobsById, jctx.JobId)
qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
} else {
qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx
qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
}
} else {
qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx
}
return evictedInThisRound, nil
}

func (qctx *QueueSchedulingContext) EvictJob(job interfaces.LegacySchedulerJob) (bool, error) {
jobId := job.GetId()
func (qctx *QueueSchedulingContext) EvictJob(job *jobdb.Job) (bool, error) {
jobId := job.Id()
if _, ok := qctx.UnsuccessfulJobSchedulingContexts[jobId]; ok {
return false, errors.Errorf("failed evicting job %s from queue: job already marked unsuccessful", jobId)
}
if _, ok := qctx.EvictedJobsById[jobId]; ok {
return false, errors.Errorf("failed evicting job %s from queue: job already marked evicted", jobId)
}
rl := job.GetResourceRequirements().Requests
rl := job.ResourceRequirements().Requests
_, scheduledInThisRound := qctx.SuccessfulJobSchedulingContexts[jobId]
if scheduledInThisRound {
qctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.GetPriorityClassName(), rl)
qctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.PriorityClassName(), rl)
delete(qctx.SuccessfulJobSchedulingContexts, jobId)
} else {
qctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.GetPriorityClassName(), rl)
qctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.PriorityClassName(), rl)
qctx.EvictedJobsById[jobId] = true
}
qctx.Allocated.SubV1ResourceList(rl)
qctx.AllocatedByPriorityClass.SubV1ResourceList(job.GetPriorityClassName(), rl)
qctx.AllocatedByPriorityClass.SubV1ResourceList(job.PriorityClassName(), rl)
return scheduledInThisRound, nil
}

Expand Down Expand Up @@ -551,7 +552,7 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont
representative := jctxs[0]
return &GangSchedulingContext{
Created: time.Now(),
Queue: representative.Job.GetQueue(),
Queue: representative.Job.Queue(),
GangInfo: representative.GangInfo,
JobSchedulingContexts: jctxs,
TotalResourceRequests: totalResourceRequests,
Expand Down Expand Up @@ -613,7 +614,7 @@ type JobSchedulingContext struct {
// Indicates whether this context is for re-scheduling an evicted job.
IsEvicted bool
// Job spec.
Job interfaces.LegacySchedulerJob
Job *jobdb.Job
// Scheduling requirements of this job.
// We currently require that each job contains exactly one pod spec.
PodRequirements *schedulerobjects.PodRequirements
Expand Down Expand Up @@ -663,9 +664,9 @@ func (jctx *JobSchedulingContext) SchedulingKey() (schedulerobjects.SchedulingKe
if len(jctx.AdditionalNodeSelectors) != 0 || len(jctx.AdditionalTolerations) != 0 {
return schedulerobjects.EmptySchedulingKey, false
}
schedulingKey, ok := jctx.Job.GetSchedulingKey()
schedulingKey, ok := jctx.Job.SchedulingKey()
if !ok {
schedulingKey = interfaces.SchedulingKeyFromLegacySchedulerJob(defaultSchedulingKeyGenerator, jctx.Job)
schedulingKey = jobdb.SchedulingKeyFromJob(defaultSchedulingKeyGenerator, jctx.Job)
}
return schedulingKey, true
}
Expand Down Expand Up @@ -715,15 +716,15 @@ func EmptyGangInfo(job interfaces.MinimalJob) GangInfo {
Id: "",
Cardinality: 1,
MinimumCardinality: 1,
PriorityClassName: job.GetPriorityClassName(),
NodeUniformity: job.GetAnnotations()[configuration.GangNodeUniformityLabelAnnotation],
PriorityClassName: job.PriorityClassName(),
NodeUniformity: job.Annotations()[configuration.GangNodeUniformityLabelAnnotation],
}
}

func GangInfoFromLegacySchedulerJob(job interfaces.MinimalJob) (GangInfo, error) {
gangInfo := EmptyGangInfo(job)

annotations := job.GetAnnotations()
annotations := job.Annotations()

gangId, ok := annotations[configuration.GangIdAnnotation]
if !ok {
Expand Down Expand Up @@ -767,24 +768,24 @@ func GangInfoFromLegacySchedulerJob(job interfaces.MinimalJob) (GangInfo, error)
return gangInfo, nil
}

func JobSchedulingContextsFromJobs[J interfaces.LegacySchedulerJob](priorityClasses map[string]types.PriorityClass, jobs []J) []*JobSchedulingContext {
func JobSchedulingContextsFromJobs[J *jobdb.Job](priorityClasses map[string]types.PriorityClass, jobs []J) []*JobSchedulingContext {
jctxs := make([]*JobSchedulingContext, len(jobs))
for i, job := range jobs {
jctxs[i] = JobSchedulingContextFromJob(priorityClasses, job)
jctxs[i] = JobSchedulingContextFromJob(job)
}
return jctxs
}

func JobSchedulingContextFromJob(priorityClasses map[string]types.PriorityClass, job interfaces.LegacySchedulerJob) *JobSchedulingContext {
func JobSchedulingContextFromJob(job *jobdb.Job) *JobSchedulingContext {
gangInfo, err := GangInfoFromLegacySchedulerJob(job)
if err != nil {
logrus.Errorf("failed to extract gang info from job %s: %s", job.GetId(), err)
logrus.Errorf("failed to extract gang info from job %s: %s", job.Id(), err)
}
return &JobSchedulingContext{
Created: time.Now(),
JobId: job.GetId(),
JobId: job.Id(),
Job: job,
PodRequirements: job.GetPodRequirements(priorityClasses),
PodRequirements: job.PodRequirements(),
GangInfo: gangInfo,
ShouldFail: false,
}
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ func testNSmallCpuJobSchedulingContext(queue, priorityClassName string, n int) [
func testSmallCpuJobSchedulingContext(queue, priorityClassName string) *JobSchedulingContext {
job := testfixtures.Test1Cpu4GiJob(queue, priorityClassName)
return &JobSchedulingContext{
JobId: job.GetId(),
JobId: job.Id(),
Job: job,
PodRequirements: job.GetPodRequirements(testfixtures.TestPriorityClasses),
PodRequirements: job.PodRequirements(),
GangInfo: EmptyGangInfo(job),
}
}
4 changes: 2 additions & 2 deletions internal/scheduler/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/armadaproject/armada/internal/common/util"
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/nodedb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
Expand Down Expand Up @@ -63,7 +63,7 @@ func (sch *GangScheduler) updateGangSchedulingContextOnSuccess(gctx *schedulerco
func (sch *GangScheduler) updateGangSchedulingContextOnFailure(gctx *schedulercontext.GangSchedulingContext, gangAddedToSchedulingContext bool, unschedulableReason string) error {
// If the job was added to the context, remove it first.
if gangAddedToSchedulingContext {
failedJobs := util.Map(gctx.JobSchedulingContexts, func(jctx *schedulercontext.JobSchedulingContext) interfaces.LegacySchedulerJob { return jctx.Job })
failedJobs := util.Map(gctx.JobSchedulingContexts, func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job { return jctx.Job })
if _, err := sch.schedulingContext.EvictGang(failedJobs); err != nil {
return err
}
Expand Down
17 changes: 15 additions & 2 deletions internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,10 @@ func TestGangScheduler(t *testing.T) {
Gangs: func() (gangs [][]*jobdb.Job) {
var jobId ulid.ULID
jobId = util.ULID()
gangs = append(gangs, []*jobdb.Job{testfixtures.TestJob("A", jobId, "armada-preemptible-away", testfixtures.Test1Cpu4GiPodReqs("A", jobId, 30000))})
gangs = append(gangs, []*jobdb.Job{
testfixtures.
TestJob("A", jobId, "armada-preemptible-away", testfixtures.Test1Cpu4GiPodReqs("A", jobId, 30000)),
})
jobId = util.ULID()
gangs = append(gangs, []*jobdb.Job{testfixtures.TestJob("A", jobId, "armada-preemptible-away-both", testfixtures.Test1Cpu4GiPodReqs("A", jobId, 30000))})
return
Expand Down Expand Up @@ -568,6 +571,16 @@ func TestGangScheduler(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// This is hacktabulous. Essentially the jobs have the wrong priority classes set at this point
// because textfixtures.TestJob() initialises the jobs using a jobDb that doesn't know anything about the
// priority classes we have defined in this test. We therefore need to fix the priority classes here.
// The long term strategy is to try and remove the need to have a jobDB for creating jobs.
for i, gang := range tc.Gangs {
for j, job := range gang {
tc.Gangs[i][j] = job.WithPriorityClass(tc.SchedulingConfig.PriorityClasses[job.PriorityClassName()])
}
}

nodesById := make(map[string]*schedulerobjects.Node, len(tc.Nodes))
for _, node := range tc.Nodes {
nodesById[node.Id] = node
Expand Down Expand Up @@ -595,7 +608,7 @@ func TestGangScheduler(t *testing.T) {
priorityFactorByQueue := make(map[string]float64)
for _, jobs := range tc.Gangs {
for _, job := range jobs {
priorityFactorByQueue[job.GetQueue()] = 1
priorityFactorByQueue[job.Queue()] = 1
}
}

Expand Down
Loading

0 comments on commit 586efd5

Please sign in to comment.