diff --git a/internal/armada/submit/validation/validation.go b/internal/armada/submit/validation/validation.go index af078cce1a2..4f838c88930 100644 --- a/internal/armada/submit/validation/validation.go +++ b/internal/armada/submit/validation/validation.go @@ -246,8 +246,8 @@ type jobAdapter struct { *api.JobSubmitRequestItem } -// GetPriorityClassName is needed to fulfil the MinimalJob interface -func (j jobAdapter) GetPriorityClassName() string { +// PriorityClassName is needed to fulfil the MinimalJob interface +func (j jobAdapter) PriorityClassName() string { podSpec := j.GetMainPodSpec() if podSpec != nil { return j.GetMainPodSpec().PriorityClassName @@ -255,6 +255,11 @@ func (j jobAdapter) GetPriorityClassName() string { return "" } +// Annotations is needed to fulfil the MinimalJob interface +func (j jobAdapter) Annotations() map[string]string { + return j.GetAnnotations() +} + // Ensures that any gang jobs defined in the request are consistent. This checks that all jobs in the same gang have // the same: // - Cardinality diff --git a/internal/common/eventutil/eventutil.go b/internal/common/eventutil/eventutil.go index d430ace9bf4..dd3615fa4b8 100644 --- a/internal/common/eventutil/eventutil.go +++ b/internal/common/eventutil/eventutil.go @@ -206,7 +206,7 @@ func LogSubmitJobFromApiJob(job *api.Job) (*armadaevents.SubmitJob, error) { Message: "Both PodSpec and PodSpecs are set", }) } - jobId, err := armadaevents.ProtoUuidFromUlidString(job.GetId()) + jobId, err := armadaevents.ProtoUuidFromUlidString(job.Id) if err != nil { return nil, err } diff --git a/internal/scheduler/common.go b/internal/scheduler/common.go index 3368419b841..066ab502505 100644 --- a/internal/scheduler/common.go +++ b/internal/scheduler/common.go @@ -9,7 +9,7 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" armadaslices "github.com/armadaproject/armada/internal/common/slices" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" - "github.com/armadaproject/armada/internal/scheduler/interfaces" + "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -22,34 +22,34 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*schedul jobsByQueue := armadaslices.MapAndGroupByFuncs( jctxs, func(jctx *schedulercontext.JobSchedulingContext) string { - return jctx.Job.GetQueue() + return jctx.Job.Queue() }, - func(jctx *schedulercontext.JobSchedulingContext) interfaces.LegacySchedulerJob { + func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job { return jctx.Job }, ) resourcesByQueue := armadamaps.MapValues( jobsByQueue, - func(jobs []interfaces.LegacySchedulerJob) schedulerobjects.ResourceList { + func(jobs []*jobdb.Job) schedulerobjects.ResourceList { rv := schedulerobjects.NewResourceListWithDefaultSize() for _, job := range jobs { - rv.AddV1ResourceList(job.GetResourceRequirements().Requests) + rv.AddV1ResourceList(job.ResourceRequirements().Requests) } return rv }, ) jobCountPerQueue := armadamaps.MapValues( jobsByQueue, - func(jobs []interfaces.LegacySchedulerJob) int { + func(jobs []*jobdb.Job) int { return len(jobs) }, ) jobIdsByQueue := armadamaps.MapValues( jobsByQueue, - func(jobs []interfaces.LegacySchedulerJob) []string { + func(jobs []*jobdb.Job) []string { rv := make([]string, len(jobs)) for i, job := range jobs { - rv[i] = job.GetId() + rv[i] = job.Id() } return rv }, diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index f20b8c0cd34..d7646463f3a 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -22,6 +22,7 @@ import ( "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/scheduler/fairness" "github.com/armadaproject/armada/internal/scheduler/interfaces" + "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -240,9 +241,9 @@ func (sctx *SchedulingContext) AddGangSchedulingContext(gctx *GangSchedulingCont // AddJobSchedulingContext adds a job scheduling context. // Automatically updates scheduled resources. func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContext) (bool, error) { - qctx, ok := sctx.QueueSchedulingContexts[jctx.Job.GetQueue()] + qctx, ok := sctx.QueueSchedulingContexts[jctx.Job.Queue()] if !ok { - return false, errors.Errorf("failed adding job %s to scheduling context: no context for queue %s", jctx.JobId, jctx.Job.GetQueue()) + return false, errors.Errorf("failed adding job %s to scheduling context: no context for queue %s", jctx.JobId, jctx.Job.Queue()) } evictedInThisRound, err := qctx.AddJobSchedulingContext(jctx) if err != nil { @@ -251,18 +252,18 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex if jctx.IsSuccessful() { if evictedInThisRound { sctx.EvictedResources.SubV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) - sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) + sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumEvictedJobs-- } else { sctx.ScheduledResources.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) - sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) + sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumScheduledJobs++ } } return evictedInThisRound, nil } -func (sctx *SchedulingContext) EvictGang(jobs []interfaces.LegacySchedulerJob) (bool, error) { +func (sctx *SchedulingContext) EvictGang(jobs []*jobdb.Job) (bool, error) { allJobsScheduledInThisRound := true for _, job := range jobs { scheduledInThisRound, err := sctx.EvictJob(job) @@ -277,23 +278,23 @@ func (sctx *SchedulingContext) EvictGang(jobs []interfaces.LegacySchedulerJob) ( return allJobsScheduledInThisRound, nil } -func (sctx *SchedulingContext) EvictJob(job interfaces.LegacySchedulerJob) (bool, error) { - qctx, ok := sctx.QueueSchedulingContexts[job.GetQueue()] +func (sctx *SchedulingContext) EvictJob(job *jobdb.Job) (bool, error) { + qctx, ok := sctx.QueueSchedulingContexts[job.Queue()] if !ok { - return false, errors.Errorf("failed evicting job %s from scheduling context: no context for queue %s", job.GetId(), job.GetQueue()) + return false, errors.Errorf("failed evicting job %s from scheduling context: no context for queue %s", job.Id(), job.Queue()) } scheduledInThisRound, err := qctx.EvictJob(job) if err != nil { return false, err } - rl := job.GetResourceRequirements().Requests + rl := job.ResourceRequirements().Requests if scheduledInThisRound { sctx.ScheduledResources.SubV1ResourceList(rl) - sctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.GetPriorityClassName(), rl) + sctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.PriorityClassName(), rl) sctx.NumScheduledJobs-- } else { sctx.EvictedResources.AddV1ResourceList(rl) - sctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.GetPriorityClassName(), rl) + sctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.PriorityClassName(), rl) sctx.NumEvictedJobs++ } return scheduledInThisRound, nil @@ -481,16 +482,16 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC // Always update ResourcesByPriority. // Since ResourcesByPriority is used to order queues by fraction of fair share. qctx.Allocated.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) - qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) + qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) // Only if the job is not evicted, update ScheduledResourcesByPriority. // Since ScheduledResourcesByPriority is used to control per-round scheduling constraints. if evictedInThisRound { delete(qctx.EvictedJobsById, jctx.JobId) - qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) + qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) } else { qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx - qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) + qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) } } else { qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx @@ -498,25 +499,25 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC return evictedInThisRound, nil } -func (qctx *QueueSchedulingContext) EvictJob(job interfaces.LegacySchedulerJob) (bool, error) { - jobId := job.GetId() +func (qctx *QueueSchedulingContext) EvictJob(job *jobdb.Job) (bool, error) { + jobId := job.Id() if _, ok := qctx.UnsuccessfulJobSchedulingContexts[jobId]; ok { return false, errors.Errorf("failed evicting job %s from queue: job already marked unsuccessful", jobId) } if _, ok := qctx.EvictedJobsById[jobId]; ok { return false, errors.Errorf("failed evicting job %s from queue: job already marked evicted", jobId) } - rl := job.GetResourceRequirements().Requests + rl := job.ResourceRequirements().Requests _, scheduledInThisRound := qctx.SuccessfulJobSchedulingContexts[jobId] if scheduledInThisRound { - qctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.GetPriorityClassName(), rl) + qctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.PriorityClassName(), rl) delete(qctx.SuccessfulJobSchedulingContexts, jobId) } else { - qctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.GetPriorityClassName(), rl) + qctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.PriorityClassName(), rl) qctx.EvictedJobsById[jobId] = true } qctx.Allocated.SubV1ResourceList(rl) - qctx.AllocatedByPriorityClass.SubV1ResourceList(job.GetPriorityClassName(), rl) + qctx.AllocatedByPriorityClass.SubV1ResourceList(job.PriorityClassName(), rl) return scheduledInThisRound, nil } @@ -551,7 +552,7 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont representative := jctxs[0] return &GangSchedulingContext{ Created: time.Now(), - Queue: representative.Job.GetQueue(), + Queue: representative.Job.Queue(), GangInfo: representative.GangInfo, JobSchedulingContexts: jctxs, TotalResourceRequests: totalResourceRequests, @@ -613,7 +614,7 @@ type JobSchedulingContext struct { // Indicates whether this context is for re-scheduling an evicted job. IsEvicted bool // Job spec. - Job interfaces.LegacySchedulerJob + Job *jobdb.Job // Scheduling requirements of this job. // We currently require that each job contains exactly one pod spec. PodRequirements *schedulerobjects.PodRequirements @@ -663,9 +664,9 @@ func (jctx *JobSchedulingContext) SchedulingKey() (schedulerobjects.SchedulingKe if len(jctx.AdditionalNodeSelectors) != 0 || len(jctx.AdditionalTolerations) != 0 { return schedulerobjects.EmptySchedulingKey, false } - schedulingKey, ok := jctx.Job.GetSchedulingKey() + schedulingKey, ok := jctx.Job.SchedulingKey() if !ok { - schedulingKey = interfaces.SchedulingKeyFromLegacySchedulerJob(defaultSchedulingKeyGenerator, jctx.Job) + schedulingKey = jobdb.SchedulingKeyFromJob(defaultSchedulingKeyGenerator, jctx.Job) } return schedulingKey, true } @@ -715,15 +716,15 @@ func EmptyGangInfo(job interfaces.MinimalJob) GangInfo { Id: "", Cardinality: 1, MinimumCardinality: 1, - PriorityClassName: job.GetPriorityClassName(), - NodeUniformity: job.GetAnnotations()[configuration.GangNodeUniformityLabelAnnotation], + PriorityClassName: job.PriorityClassName(), + NodeUniformity: job.Annotations()[configuration.GangNodeUniformityLabelAnnotation], } } func GangInfoFromLegacySchedulerJob(job interfaces.MinimalJob) (GangInfo, error) { gangInfo := EmptyGangInfo(job) - annotations := job.GetAnnotations() + annotations := job.Annotations() gangId, ok := annotations[configuration.GangIdAnnotation] if !ok { @@ -767,24 +768,24 @@ func GangInfoFromLegacySchedulerJob(job interfaces.MinimalJob) (GangInfo, error) return gangInfo, nil } -func JobSchedulingContextsFromJobs[J interfaces.LegacySchedulerJob](priorityClasses map[string]types.PriorityClass, jobs []J) []*JobSchedulingContext { +func JobSchedulingContextsFromJobs[J *jobdb.Job](priorityClasses map[string]types.PriorityClass, jobs []J) []*JobSchedulingContext { jctxs := make([]*JobSchedulingContext, len(jobs)) for i, job := range jobs { - jctxs[i] = JobSchedulingContextFromJob(priorityClasses, job) + jctxs[i] = JobSchedulingContextFromJob(job) } return jctxs } -func JobSchedulingContextFromJob(priorityClasses map[string]types.PriorityClass, job interfaces.LegacySchedulerJob) *JobSchedulingContext { +func JobSchedulingContextFromJob(job *jobdb.Job) *JobSchedulingContext { gangInfo, err := GangInfoFromLegacySchedulerJob(job) if err != nil { - logrus.Errorf("failed to extract gang info from job %s: %s", job.GetId(), err) + logrus.Errorf("failed to extract gang info from job %s: %s", job.Id(), err) } return &JobSchedulingContext{ Created: time.Now(), - JobId: job.GetId(), + JobId: job.Id(), Job: job, - PodRequirements: job.GetPodRequirements(priorityClasses), + PodRequirements: job.PodRequirements(), GangInfo: gangInfo, ShouldFail: false, } diff --git a/internal/scheduler/context/context_test.go b/internal/scheduler/context/context_test.go index 61cf9c08322..adaa7a367b4 100644 --- a/internal/scheduler/context/context_test.go +++ b/internal/scheduler/context/context_test.go @@ -89,9 +89,9 @@ func testNSmallCpuJobSchedulingContext(queue, priorityClassName string, n int) [ func testSmallCpuJobSchedulingContext(queue, priorityClassName string) *JobSchedulingContext { job := testfixtures.Test1Cpu4GiJob(queue, priorityClassName) return &JobSchedulingContext{ - JobId: job.GetId(), + JobId: job.Id(), Job: job, - PodRequirements: job.GetPodRequirements(testfixtures.TestPriorityClasses), + PodRequirements: job.PodRequirements(), GangInfo: EmptyGangInfo(job), } } diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index 3dd21513e9d..cb3329578f1 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -10,7 +10,7 @@ import ( "github.com/armadaproject/armada/internal/common/util" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" - "github.com/armadaproject/armada/internal/scheduler/interfaces" + "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -63,7 +63,7 @@ func (sch *GangScheduler) updateGangSchedulingContextOnSuccess(gctx *schedulerco func (sch *GangScheduler) updateGangSchedulingContextOnFailure(gctx *schedulercontext.GangSchedulingContext, gangAddedToSchedulingContext bool, unschedulableReason string) error { // If the job was added to the context, remove it first. if gangAddedToSchedulingContext { - failedJobs := util.Map(gctx.JobSchedulingContexts, func(jctx *schedulercontext.JobSchedulingContext) interfaces.LegacySchedulerJob { return jctx.Job }) + failedJobs := util.Map(gctx.JobSchedulingContexts, func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job { return jctx.Job }) if _, err := sch.schedulingContext.EvictGang(failedJobs); err != nil { return err } diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index e72453ecc16..cf24b99b75d 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -509,7 +509,10 @@ func TestGangScheduler(t *testing.T) { Gangs: func() (gangs [][]*jobdb.Job) { var jobId ulid.ULID jobId = util.ULID() - gangs = append(gangs, []*jobdb.Job{testfixtures.TestJob("A", jobId, "armada-preemptible-away", testfixtures.Test1Cpu4GiPodReqs("A", jobId, 30000))}) + gangs = append(gangs, []*jobdb.Job{ + testfixtures. + TestJob("A", jobId, "armada-preemptible-away", testfixtures.Test1Cpu4GiPodReqs("A", jobId, 30000)), + }) jobId = util.ULID() gangs = append(gangs, []*jobdb.Job{testfixtures.TestJob("A", jobId, "armada-preemptible-away-both", testfixtures.Test1Cpu4GiPodReqs("A", jobId, 30000))}) return @@ -568,6 +571,16 @@ func TestGangScheduler(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { + // This is hacktabulous. Essentially the jobs have the wrong priority classes set at this point + // because textfixtures.TestJob() initialises the jobs using a jobDb that doesn't know anything about the + // priority classes we have defined in this test. We therefore need to fix the priority classes here. + // The long term strategy is to try and remove the need to have a jobDB for creating jobs. + for i, gang := range tc.Gangs { + for j, job := range gang { + tc.Gangs[i][j] = job.WithPriorityClass(tc.SchedulingConfig.PriorityClasses[job.PriorityClassName()]) + } + } + nodesById := make(map[string]*schedulerobjects.Node, len(tc.Nodes)) for _, node := range tc.Nodes { nodesById[node.Id] = node @@ -595,7 +608,7 @@ func TestGangScheduler(t *testing.T) { priorityFactorByQueue := make(map[string]float64) for _, jobs := range tc.Gangs { for _, job := range jobs { - priorityFactorByQueue[job.GetQueue()] = 1 + priorityFactorByQueue[job.Queue()] = 1 } } diff --git a/internal/scheduler/interfaces/interfaces.go b/internal/scheduler/interfaces/interfaces.go index c24d43300c9..d5ef8a3c84c 100644 --- a/internal/scheduler/interfaces/interfaces.go +++ b/internal/scheduler/interfaces/interfaces.go @@ -1,64 +1,6 @@ package interfaces -import ( - "time" - - v1 "k8s.io/api/core/v1" - - "github.com/armadaproject/armada/internal/common/types" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" -) - type MinimalJob interface { - GetAnnotations() map[string]string - GetPriorityClassName() string -} - -// LegacySchedulerJob is the job interface used throughout the scheduler. -type LegacySchedulerJob interface { - GetId() string - GetQueue() string - GetJobSet() string - GetPerQueuePriority() uint32 - GetSubmitTime() time.Time - GetAnnotations() map[string]string - GetPodRequirements(priorityClasses map[string]types.PriorityClass) *schedulerobjects.PodRequirements - GetPriorityClassName() string - GetScheduledAtPriority() (int32, bool) - GetNodeSelector() map[string]string - GetAffinity() *v1.Affinity - GetTolerations() []v1.Toleration - GetResourceRequirements() v1.ResourceRequirements - GetQueueTtlSeconds() int64 - // GetSchedulingKey returns (schedulingKey, true) if the job has a scheduling key associated with it and - // (emptySchedulingKey, false) otherwise, where emptySchedulingKey is the zero value of the SchedulingKey type. - GetSchedulingKey() (schedulerobjects.SchedulingKey, bool) - // SchedulingOrderCompare defines the order in which jobs in a queue should be scheduled - // (both when scheduling new jobs and when re-scheduling evicted jobs). - // Specifically, compare returns - // - 0 if the jobs have equal job id, - // - -1 if job should be scheduled before other, - // - +1 if other should be scheduled before other. - SchedulingOrderCompare(other LegacySchedulerJob) int -} - -func PriorityClassFromLegacySchedulerJob(priorityClasses map[string]types.PriorityClass, defaultPriorityClassName string, job LegacySchedulerJob) types.PriorityClass { - priorityClassName := job.GetPriorityClassName() - if priorityClass, ok := priorityClasses[priorityClassName]; ok { - return priorityClass - } - // We could return (types.PriorityClass{}, false) here, but then callers - // might handle this situation in different ways; return the default - // priority class in order to enforce uniformity. - return priorityClasses[defaultPriorityClassName] -} - -func SchedulingKeyFromLegacySchedulerJob(skg *schedulerobjects.SchedulingKeyGenerator, job LegacySchedulerJob) schedulerobjects.SchedulingKey { - return skg.Key( - job.GetNodeSelector(), - job.GetAffinity(), - job.GetTolerations(), - job.GetResourceRequirements().Requests, - job.GetPriorityClassName(), - ) + Annotations() map[string]string + PriorityClassName() string } diff --git a/internal/scheduler/jobdb/comparison.go b/internal/scheduler/jobdb/comparison.go index c7278cf1132..94ba59395c0 100644 --- a/internal/scheduler/jobdb/comparison.go +++ b/internal/scheduler/jobdb/comparison.go @@ -2,8 +2,6 @@ package jobdb import ( "time" - - "github.com/armadaproject/armada/internal/scheduler/interfaces" ) type ( @@ -27,8 +25,8 @@ func (j JobQueueTtlComparer) Compare(a, b *Job) int { aDuration := timeSeconds - (a.submittedTime / 1_000_000_000) bDuration := timeSeconds - (b.submittedTime / 1_000_000_000) - aRemaining := max(0, a.GetQueueTtlSeconds()-aDuration) - bRemaining := max(0, b.GetQueueTtlSeconds()-bDuration) + aRemaining := max(0, a.QueueTtlSeconds()-aDuration) + bRemaining := max(0, b.QueueTtlSeconds()-bDuration) // If jobs have different ttl remaining, they are ordered by remaining queue ttl - the smallest ttl first. if aRemaining != bRemaining { @@ -60,10 +58,10 @@ func (JobPriorityComparer) Compare(job, other *Job) int { } // SchedulingOrderCompare defines the order in which jobs in a particular queue should be scheduled, -func (job *Job) SchedulingOrderCompare(other interfaces.LegacySchedulerJob) int { +func (job *Job) SchedulingOrderCompare(other *Job) int { // We need this cast for now to expose this method via an interface. // This is safe since we only ever compare jobs of the same type. - return SchedulingOrderCompare(job, other.(*Job)) + return SchedulingOrderCompare(job, other) } // SchedulingOrderCompare defines the order in which jobs in a queue should be scheduled diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index 90482f9d59a..a4f5db001d7 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -13,7 +13,6 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/common/types" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -347,58 +346,34 @@ func (job *Job) Id() string { return job.id } -// GetId returns the id of the Job. -// This is needed for the LegacyJob interface. -func (job *Job) GetId() string { - return job.id -} - // Jobset returns the jobSet the job belongs to. func (job *Job) Jobset() string { return job.jobSet } -// GetJobSet returns the jobSet the job belongs to. -// This is needed for compatibility with legacyJob -func (job *Job) GetJobSet() string { - return job.jobSet -} - // Queue returns the queue this job belongs to. func (job *Job) Queue() string { return job.queue } -// GetQueue returns the queue this job belongs to. -// This is needed for the LegacyJob interface. -func (job *Job) GetQueue() string { - return job.queue -} - // Priority returns the priority of the job. func (job *Job) Priority() uint32 { return job.priority } -// Priority returns the priority class of the job. -func (job *Job) GetPriorityClass() types.PriorityClass { +// PriorityClass returns the priority class of the job. +func (job *Job) PriorityClass() types.PriorityClass { return job.priorityClass } -// GetSchedulingKey returns the scheduling key associated with a job. +// SchedulingKey returns the scheduling key associated with a job. // The second return value is always true since scheduling keys are computed at job creation time. -// This is needed for compatibility with interfaces.LegacySchedulerJob. -func (job *Job) GetSchedulingKey() (schedulerobjects.SchedulingKey, bool) { +func (job *Job) SchedulingKey() (schedulerobjects.SchedulingKey, bool) { return job.schedulingKey, true } -// GetPerQueuePriority exists for compatibility with the LegacyJob interface. -func (job *Job) GetPerQueuePriority() uint32 { - return job.priority -} - -// GetSubmitTime exists for compatibility with the LegacyJob interface. -func (job *Job) GetSubmitTime() time.Time { +// SubmitTime exists for compatibility with the LegacyJob interface. +func (job *Job) SubmitTime() time.Time { if job.jobSchedulingInfo == nil { return time.Time{} } @@ -417,6 +392,13 @@ func (job *Job) WithPriority(priority uint32) *Job { return j } +// WithPriorityClass returns a copy of the job with the priority class updated. +func (job *Job) WithPriorityClass(priorityClass types.PriorityClass) *Job { + j := copyJob(*job) + j.priorityClass = priorityClass + return j +} + // WithSubmittedTime returns a copy of the job with submittedTime updated. func (job *Job) WithSubmittedTime(submittedTime int64) *Job { j := copyJob(*job) @@ -436,24 +418,26 @@ func (job *Job) JobSchedulingInfo() *schedulerobjects.JobSchedulingInfo { return job.jobSchedulingInfo } -// GetAnnotations returns the annotations on the job. -// This is needed for compatibility with interfaces.LegacySchedulerJob -func (job *Job) GetAnnotations() map[string]string { +// Annotations returns the annotations on the job. +func (job *Job) Annotations() map[string]string { if req := job.PodRequirements(); req != nil { return req.Annotations } return nil } -// Needed for compatibility with interfaces.LegacySchedulerJob -func (job *Job) GetPriorityClassName() string { +// PriorityClassName returns the name of the job's Priority Class +// TODO: this can be inconsistent with job.PriorityClass() +func (job *Job) PriorityClassName() string { if schedulingInfo := job.JobSchedulingInfo(); schedulingInfo != nil { return schedulingInfo.PriorityClassName } return "" } -func (job *Job) GetScheduledAtPriority() (int32, bool) { +// ScheduledAtPriority returns the numeric priority at which the job was scheduled +// This will return false if the job has not been scheduled yet +func (job *Job) ScheduledAtPriority() (int32, bool) { run := job.LatestRun() if run == nil { return -1, false @@ -465,52 +449,49 @@ func (job *Job) GetScheduledAtPriority() (int32, bool) { return *scheduledAtPriority, true } -// Needed for compatibility with interfaces.LegacySchedulerJob -func (job *Job) GetNodeSelector() map[string]string { +// NodeSelector returns the Node Selector requested by the Job +func (job *Job) NodeSelector() map[string]string { if req := job.PodRequirements(); req != nil { return req.NodeSelector } return nil } -// Needed for compatibility with interfaces.LegacySchedulerJob -func (job *Job) GetAffinity() *v1.Affinity { +// Affinity returns the Affinity requested by the Job +func (job *Job) Affinity() *v1.Affinity { if req := job.PodRequirements(); req != nil { return req.Affinity } return nil } -// Needed for compatibility with interfaces.LegacySchedulerJob -func (job *Job) GetTolerations() []v1.Toleration { +// Tolerations returns the Tolerations requested by the Job +func (job *Job) Tolerations() []v1.Toleration { if req := job.PodRequirements(); req != nil { return req.Tolerations } return nil } -// Needed for compatibility with interfaces.LegacySchedulerJob -func (job *Job) GetResourceRequirements() v1.ResourceRequirements { +// ResourceRequirements returns the resource requirements of the Job +func (job *Job) ResourceRequirements() v1.ResourceRequirements { if req := job.PodRequirements(); req != nil { return req.ResourceRequirements } return v1.ResourceRequirements{} } -// Needed for compatibility with interfaces.LegacySchedulerJob -func (job *Job) GetQueueTtlSeconds() int64 { +// QueueTtlSeconds returns the time in seconds that the job should remain queued +// 0 means that this field is unset +func (job *Job) QueueTtlSeconds() int64 { return job.jobSchedulingInfo.QueueTtlSeconds } +// PodRequirements returns the pod requirements of the Job func (job *Job) PodRequirements() *schedulerobjects.PodRequirements { return job.jobSchedulingInfo.GetPodRequirements() } -// GetPodRequirements is needed for compatibility with interfaces.LegacySchedulerJob. -func (job *Job) GetPodRequirements(_ map[string]types.PriorityClass) *schedulerobjects.PodRequirements { - return job.PodRequirements() -} - // Queued returns true if the job should be considered by the scheduler for assignment or false otherwise. func (job *Job) Queued() bool { return job.queued @@ -714,7 +695,7 @@ func (job *Job) RunById(id uuid.UUID) *JobRun { // Invariants: // - job.created < `t` func (job *Job) HasQueueTtlExpired() bool { - ttlSeconds := job.GetQueueTtlSeconds() + ttlSeconds := job.QueueTtlSeconds() if ttlSeconds > 0 { timeSeconds := time.Now().UTC().Unix() @@ -729,7 +710,7 @@ func (job *Job) HasQueueTtlExpired() bool { // HasQueueTtlSet returns true if the given job has a queueTtl set. func (job *Job) HasQueueTtlSet() bool { - return job.GetQueueTtlSeconds() > 0 + return job.QueueTtlSeconds() > 0 } // WithJobset returns a copy of the job with the jobSet updated. @@ -760,7 +741,7 @@ func (job *Job) WithJobSchedulingInfo(jobSchedulingInfo *schedulerobjects.JobSch j.ensureJobSchedulingInfoFieldsInitialised() // Changing the scheduling info invalidates the scheduling key stored with the job. - j.schedulingKey = interfaces.SchedulingKeyFromLegacySchedulerJob(j.jobDb.schedulingKeyGenerator, j) + j.schedulingKey = SchedulingKeyFromJob(j.jobDb.schedulingKeyGenerator, j) return j } @@ -783,3 +764,13 @@ func (job *Job) DeepCopy() *Job { func copyJob(j Job) *Job { return &j } + +func SchedulingKeyFromJob(skg *schedulerobjects.SchedulingKeyGenerator, job *Job) schedulerobjects.SchedulingKey { + return skg.Key( + job.NodeSelector(), + job.Affinity(), + job.Tolerations(), + job.ResourceRequirements().Requests, + job.PriorityClassName(), + ) +} diff --git a/internal/scheduler/jobdb/job_test.go b/internal/scheduler/jobdb/job_test.go index 4cbd04cc6f3..be8cf7cad98 100644 --- a/internal/scheduler/jobdb/job_test.go +++ b/internal/scheduler/jobdb/job_test.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -52,12 +53,10 @@ var baseRun = &JobRun{ // Test methods that only have getters func TestJob_TestGetter(t *testing.T) { assert.Equal(t, baseJob.id, baseJob.Id()) - assert.Equal(t, baseJob.id, baseJob.GetId()) assert.Equal(t, baseJob.queue, baseJob.Queue()) - assert.Equal(t, baseJob.queue, baseJob.GetQueue()) assert.Equal(t, baseJob.submittedTime, baseJob.Created()) assert.Equal(t, jobSchedulingInfo, baseJob.JobSchedulingInfo()) - assert.Equal(t, baseJob.GetAnnotations(), map[string]string{ + assert.Equal(t, baseJob.Annotations(), map[string]string{ "foo": "bar", }) } @@ -285,6 +284,16 @@ func TestJob_TestWithJobset(t *testing.T) { assert.Equal(t, "fish", newJob.Jobset()) } +func TestJob_TestWithPriorityClass(t *testing.T) { + pc := types.PriorityClass{ + Priority: 100, + Preemptible: true, + } + newJob := baseJob.WithPriorityClass(pc) + assert.Equal(t, types.PriorityClass{Priority: 3, Preemptible: false}, baseJob.PriorityClass()) + assert.Equal(t, pc, newJob.PriorityClass()) +} + func TestJob_TestWithQueue(t *testing.T) { newJob := baseJob.WithQueue("fish") assert.Equal(t, "test-queue", baseJob.Queue()) @@ -353,12 +362,12 @@ func TestJobSchedulingInfoFieldsInitialised(t *testing.T) { assert.Nil(t, infoWithNilFields.GetPodRequirements().Annotations) job := jobDb.NewJob("test-job", "test-jobSet", "test-queue", 2, infoWithNilFieldsCopy, true, 0, false, false, false, 3) - assert.NotNil(t, job.GetNodeSelector()) - assert.NotNil(t, job.GetAnnotations()) + assert.NotNil(t, job.NodeSelector()) + assert.NotNil(t, job.Annotations()) // Copy again here, as the fields get mutated so we want a clean copy infoWithNilFieldsCopy2 := proto.Clone(infoWithNilFields).(*schedulerobjects.JobSchedulingInfo) updatedJob := baseJob.WithJobSchedulingInfo(infoWithNilFieldsCopy2) - assert.NotNil(t, updatedJob.GetNodeSelector()) - assert.NotNil(t, updatedJob.GetAnnotations()) + assert.NotNil(t, updatedJob.NodeSelector()) + assert.NotNil(t, updatedJob.Annotations()) } diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index ee066dccc71..073096fac3f 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -13,7 +13,6 @@ import ( "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -149,7 +148,7 @@ func (jobDb *JobDb) NewJob( runsById: map[uuid.UUID]*JobRun{}, } job.ensureJobSchedulingInfoFieldsInitialised() - job.schedulingKey = interfaces.SchedulingKeyFromLegacySchedulerJob(jobDb.schedulingKeyGenerator, job) + job.schedulingKey = SchedulingKeyFromJob(jobDb.schedulingKeyGenerator, job) return job } diff --git a/internal/scheduler/jobdb/jobdb_test.go b/internal/scheduler/jobdb/jobdb_test.go index d0fb9756eb7..6acedf172cc 100644 --- a/internal/scheduler/jobdb/jobdb_test.go +++ b/internal/scheduler/jobdb/jobdb_test.go @@ -17,7 +17,6 @@ import ( "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/common/util" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -122,7 +121,7 @@ func TestJobDb_TestQueuedJobs(t *testing.T) { require.NoError(t, err) collect := func() []*Job { retrieved := make([]*Job, 0) - iter := txn.QueuedJobs(jobs[0].GetQueue()) + iter := txn.QueuedJobs(jobs[0].Queue()) for !iter.Done() { j, _ := iter.Next() retrieved = append(retrieved, j) @@ -248,9 +247,9 @@ func TestJobDb_SchedulingKeyIsPopulated(t *testing.T) { jobDb := NewTestJobDb() job := jobDb.NewJob("jobId", "jobSet", "queue", 1, jobSchedulingInfo, false, 0, false, false, false, 2) - actualSchedulingKey, ok := job.GetSchedulingKey() + actualSchedulingKey, ok := job.SchedulingKey() require.True(t, ok) - assert.Equal(t, interfaces.SchedulingKeyFromLegacySchedulerJob(jobDb.schedulingKeyGenerator, job), actualSchedulingKey) + assert.Equal(t, SchedulingKeyFromJob(jobDb.schedulingKeyGenerator, job), actualSchedulingKey) } func TestJobDb_SchedulingKey(t *testing.T) { @@ -1244,13 +1243,13 @@ func TestJobDb_SchedulingKey(t *testing.T) { jobSchedulingInfoB.ObjectRequirements[0].Requirements = &schedulerobjects.ObjectRequirements_PodRequirements{PodRequirements: tc.podRequirementsB} jobB := baseJob.WithJobSchedulingInfo(jobSchedulingInfoB) - schedulingKeyA := interfaces.SchedulingKeyFromLegacySchedulerJob(skg, jobA) - schedulingKeyB := interfaces.SchedulingKeyFromLegacySchedulerJob(skg, jobB) + schedulingKeyA := SchedulingKeyFromJob(skg, jobA) + schedulingKeyB := SchedulingKeyFromJob(skg, jobB) // Generate the keys several times to check their consistency. for i := 1; i < 10; i++ { - assert.Equal(t, interfaces.SchedulingKeyFromLegacySchedulerJob(skg, jobA), schedulingKeyA) - assert.Equal(t, interfaces.SchedulingKeyFromLegacySchedulerJob(skg, jobB), schedulingKeyB) + assert.Equal(t, SchedulingKeyFromJob(skg, jobA), schedulingKeyA) + assert.Equal(t, SchedulingKeyFromJob(skg, jobB), schedulingKeyB) } if tc.equal { diff --git a/internal/scheduler/jobiteration.go b/internal/scheduler/jobiteration.go index b6d7c2c3a6c..612d2d84630 100644 --- a/internal/scheduler/jobiteration.go +++ b/internal/scheduler/jobiteration.go @@ -9,7 +9,7 @@ import ( "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/common/util" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" - "github.com/armadaproject/armada/internal/scheduler/interfaces" + "github.com/armadaproject/armada/internal/scheduler/jobdb" ) type JobIterator interface { @@ -18,7 +18,7 @@ type JobIterator interface { type JobRepository interface { GetQueueJobIds(queueName string) []string - GetExistingJobsByIds(ids []string) []interfaces.LegacySchedulerJob + GetExistingJobsByIds(ids []string) []*jobdb.Job } type InMemoryJobIterator struct { @@ -60,9 +60,9 @@ func (repo *InMemoryJobRepository) EnqueueMany(jctxs []*schedulercontext.JobSche defer repo.mu.Unlock() updatedQueues := make(map[string]bool) for _, jctx := range jctxs { - queue := jctx.Job.GetQueue() + queue := jctx.Job.Queue() repo.jctxsByQueue[queue] = append(repo.jctxsByQueue[queue], jctx) - repo.jctxsById[jctx.Job.GetId()] = jctx + repo.jctxsById[jctx.Job.Id()] = jctx updatedQueues[queue] = true } for queue := range updatedQueues { @@ -81,15 +81,15 @@ func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) []string { return util.Map( repo.jctxsByQueue[queue], func(jctx *schedulercontext.JobSchedulingContext) string { - return jctx.Job.GetId() + return jctx.Job.Id() }, ) } -func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) []interfaces.LegacySchedulerJob { +func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) []*jobdb.Job { repo.mu.Lock() defer repo.mu.Unlock() - rv := make([]interfaces.LegacySchedulerJob, 0, len(jobIds)) + rv := make([]*jobdb.Job, 0, len(jobIds)) for _, jobId := range jobIds { if jctx, ok := repo.jctxsById[jobId]; ok { rv = append(rv, jctx.Job) @@ -132,7 +132,7 @@ func (it *QueuedJobsIterator) Next() (*schedulercontext.JobSchedulingContext, er } job := it.repo.GetExistingJobsByIds([]string{it.jobIds[it.idx]}) it.idx++ - return schedulercontext.JobSchedulingContextFromJob(it.priorityClasses, job[0]), nil + return schedulercontext.JobSchedulingContextFromJob(job[0]), nil } } diff --git a/internal/scheduler/jobiteration_test.go b/internal/scheduler/jobiteration_test.go index 6297d8b8aea..5fa8faca207 100644 --- a/internal/scheduler/jobiteration_test.go +++ b/internal/scheduler/jobiteration_test.go @@ -7,59 +7,23 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - v1 "k8s.io/api/core/v1" "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/util" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" - "github.com/armadaproject/armada/internal/scheduler/interfaces" + "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/testfixtures" - "github.com/armadaproject/armada/pkg/api" ) func TestInMemoryJobRepository(t *testing.T) { - T := time.Now() - jobs := []*api.Job{ - { - Queue: "A", - Id: "3", - Priority: 1, - Created: T.Add(3 * time.Second), - PodSpec: &v1.PodSpec{}, - }, - { - Queue: "A", - Id: "1", - Priority: 1, - Created: T.Add(1 * time.Second), - PodSpec: &v1.PodSpec{}, - }, - { - Queue: "A", - Id: "2", - Priority: 1, - Created: T.Add(2 * time.Second), - PodSpec: &v1.PodSpec{}, - }, - { - Queue: "A", - Id: "5", - Priority: 3, - PodSpec: &v1.PodSpec{}, - }, - { - Queue: "A", - Id: "0", - Priority: 0, - PodSpec: &v1.PodSpec{}, - }, - { - Queue: "A", - Id: "4", - Priority: 2, - PodSpec: &v1.PodSpec{}, - }, + jobs := []*jobdb.Job{ + testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(3).WithPriority(1), + testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(1).WithPriority(1), + testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(2).WithPriority(1), + testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(0).WithPriority(3), + testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(0).WithPriority(0), + testfixtures.TestJob("A", util.ULID(), "armada-default", nil).WithCreated(0).WithPriority(2), } jctxs := make([]*schedulercontext.JobSchedulingContext, len(jobs)) for i, job := range jobs { @@ -67,8 +31,10 @@ func TestInMemoryJobRepository(t *testing.T) { } repo := NewInMemoryJobRepository() repo.EnqueueMany(jctxs) - expected := []string{"0", "1", "2", "3", "4", "5"} - actual := make([]string, 0) + expected := []*jobdb.Job{ + jobs[4], jobs[1], jobs[2], jobs[0], jobs[5], jobs[3], + } + actual := make([]*jobdb.Job, 0) it := repo.GetJobIterator("A") for { jctx, err := it.Next() @@ -76,7 +42,7 @@ func TestInMemoryJobRepository(t *testing.T) { if jctx == nil { break } - actual = append(actual, jctx.Job.GetId()) + actual = append(actual, jctx.Job) } assert.Equal(t, expected, actual) } @@ -85,16 +51,14 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) { repo := newMockJobRepository() expected := make([]string, 0) for _, req := range testfixtures.N1CpuPodReqs("A", 0, 5) { - job := apiJobFromPodSpec("A", podSpecFromPodRequirements(req)) - job.Queue = "A" + job := jobFromPodSpec("A", req) repo.Enqueue(job) - expected = append(expected, job.Id) + expected = append(expected, job.Id()) } for _, req := range testfixtures.N1CpuPodReqs("B", 0, 5) { - job := apiJobFromPodSpec("B", podSpecFromPodRequirements(req)) - job.Queue = "B" + job := jobFromPodSpec("B", req) repo.Enqueue(job) - expected = append(expected, job.Id) + expected = append(expected, job.Id()) } ctx := armadacontext.Background() @@ -112,7 +76,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) { if jctx == nil { break } - actual = append(actual, jctx.Job.GetId()) + actual = append(actual, jctx.Job.Id()) } assert.Equal(t, expected, actual) v, err := it.Next() @@ -124,10 +88,9 @@ func TestQueuedJobsIterator_OneQueue(t *testing.T) { repo := newMockJobRepository() expected := make([]string, 0) for _, req := range testfixtures.N1CpuPodReqs("A", 0, 10) { - job := apiJobFromPodSpec("A", podSpecFromPodRequirements(req)) - job.Queue = "A" + job := jobFromPodSpec("A", req) repo.Enqueue(job) - expected = append(expected, job.Id) + expected = append(expected, job.Id()) } ctx := armadacontext.Background() it := NewQueuedJobsIterator(ctx, "A", repo, nil) @@ -138,7 +101,7 @@ func TestQueuedJobsIterator_OneQueue(t *testing.T) { if jctx == nil { break } - actual = append(actual, jctx.Job.GetId()) + actual = append(actual, jctx.Job.Id()) } assert.Equal(t, expected, actual) } @@ -147,10 +110,9 @@ func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) { repo := newMockJobRepository() expected := make([]string, 0) for _, req := range testfixtures.N1CpuPodReqs("A", 0, 17) { - job := apiJobFromPodSpec("A", podSpecFromPodRequirements(req)) - job.Queue = "A" + job := jobFromPodSpec("A", req) repo.Enqueue(job) - expected = append(expected, job.Id) + expected = append(expected, job.Id()) } ctx := armadacontext.Background() it := NewQueuedJobsIterator(ctx, "A", repo, nil) @@ -161,7 +123,7 @@ func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) { if jctx == nil { break } - actual = append(actual, jctx.Job.GetId()) + actual = append(actual, jctx.Job.Id()) } assert.Equal(t, expected, actual) } @@ -170,10 +132,9 @@ func TestQueuedJobsIterator_ManyJobs(t *testing.T) { repo := newMockJobRepository() expected := make([]string, 0) for _, req := range testfixtures.N1CpuPodReqs("A", 0, 113) { - job := apiJobFromPodSpec("A", podSpecFromPodRequirements(req)) - job.Queue = "A" + job := jobFromPodSpec("A", req) repo.Enqueue(job) - expected = append(expected, job.Id) + expected = append(expected, job.Id()) } ctx := armadacontext.Background() it := NewQueuedJobsIterator(ctx, "A", repo, nil) @@ -184,7 +145,7 @@ func TestQueuedJobsIterator_ManyJobs(t *testing.T) { if jctx == nil { break } - actual = append(actual, jctx.Job.GetId()) + actual = append(actual, jctx.Job.Id()) } assert.Equal(t, expected, actual) } @@ -193,13 +154,13 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) { repo := newMockJobRepository() expected := make([]string, 0) for _, req := range testfixtures.N1CpuPodReqs("A", 0, 10) { - job := apiJobFromPodSpec("A", podSpecFromPodRequirements(req)) + job := jobFromPodSpec("A", req) repo.Enqueue(job) - expected = append(expected, job.Id) + expected = append(expected, job.Id()) } for _, req := range testfixtures.N1CpuPodReqs("B", 0, 10) { - job := apiJobFromPodSpec("B", podSpecFromPodRequirements(req)) + job := jobFromPodSpec("B", req) repo.Enqueue(job) } ctx := armadacontext.Background() @@ -211,7 +172,7 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) { if jctx == nil { break } - actual = append(actual, jctx.Job.GetId()) + actual = append(actual, jctx.Job.Id()) } assert.Equal(t, expected, actual) } @@ -219,8 +180,7 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) { func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) { repo := newMockJobRepository() for _, req := range testfixtures.N1CpuPodReqs("A", 0, 10) { - job := apiJobFromPodSpec("A", podSpecFromPodRequirements(req)) - job.Queue = "A" + job := jobFromPodSpec("A", req) repo.Enqueue(job) } @@ -241,8 +201,7 @@ func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) { func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) { repo := newMockJobRepository() for _, req := range testfixtures.N1CpuPodReqs("A", 0, 10) { - job := apiJobFromPodSpec("A", podSpecFromPodRequirements(req)) - job.Queue = "A" + job := jobFromPodSpec("A", req) repo.Enqueue(job) } ctx := armadacontext.Background() @@ -257,8 +216,8 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) { // TODO: Deprecate in favour of InMemoryRepo. type mockJobRepository struct { - jobsByQueue map[string][]*api.Job - jobsById map[string]*api.Job + jobsByQueue map[string][]*jobdb.Job + jobsById map[string]*jobdb.Job // Ids of all jobs hat were leased to an executor. leasedJobs map[string]bool getQueueJobIdsDelay time.Duration @@ -266,21 +225,21 @@ type mockJobRepository struct { func newMockJobRepository() *mockJobRepository { return &mockJobRepository{ - jobsByQueue: make(map[string][]*api.Job), - jobsById: make(map[string]*api.Job), + jobsByQueue: make(map[string][]*jobdb.Job), + jobsById: make(map[string]*jobdb.Job), leasedJobs: make(map[string]bool), } } -func (repo *mockJobRepository) EnqueueMany(jobs []*api.Job) { +func (repo *mockJobRepository) EnqueueMany(jobs []*jobdb.Job) { for _, job := range jobs { repo.Enqueue(job) } } -func (repo *mockJobRepository) Enqueue(job *api.Job) { - repo.jobsByQueue[job.Queue] = append(repo.jobsByQueue[job.Queue], job) - repo.jobsById[job.Id] = job +func (repo *mockJobRepository) Enqueue(job *jobdb.Job) { + repo.jobsByQueue[job.Queue()] = append(repo.jobsByQueue[job.Queue()], job) + repo.jobsById[job.Id()] = job } func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobIterator { @@ -292,8 +251,8 @@ func (repo *mockJobRepository) GetQueueJobIds(queue string) []string { if jobs, ok := repo.jobsByQueue[queue]; ok { rv := make([]string, 0, len(jobs)) for _, job := range jobs { - if !repo.leasedJobs[job.Id] { - rv = append(rv, job.Id) + if !repo.leasedJobs[job.Id()] { + rv = append(rv, job.Id()) } } return rv @@ -302,8 +261,8 @@ func (repo *mockJobRepository) GetQueueJobIds(queue string) []string { } } -func (repo *mockJobRepository) GetExistingJobsByIds(jobIds []string) []interfaces.LegacySchedulerJob { - rv := make([]interfaces.LegacySchedulerJob, len(jobIds)) +func (repo *mockJobRepository) GetExistingJobsByIds(jobIds []string) []*jobdb.Job { + rv := make([]*jobdb.Job, len(jobIds)) for i, jobId := range jobIds { if job, ok := repo.jobsById[jobId]; ok { rv[i] = job @@ -312,25 +271,6 @@ func (repo *mockJobRepository) GetExistingJobsByIds(jobIds []string) []interface return rv } -func apiJobFromPodSpec(queue string, podSpec *v1.PodSpec) *api.Job { - return &api.Job{ - Id: util.NewULID(), - PodSpec: podSpec, - Queue: queue, - } -} - -func podSpecFromPodRequirements(req *schedulerobjects.PodRequirements) *v1.PodSpec { - return &v1.PodSpec{ - NodeSelector: req.NodeSelector, - Affinity: req.Affinity, - Tolerations: req.Tolerations, - Priority: &req.Priority, - PreemptionPolicy: (*v1.PreemptionPolicy)(&req.PreemptionPolicy), - Containers: []v1.Container{ - { - Resources: req.ResourceRequirements, - }, - }, - } +func jobFromPodSpec(queue string, req *schedulerobjects.PodRequirements) *jobdb.Job { + return testfixtures.TestJob(queue, util.ULID(), "armada-default", req) } diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index c5911b84a8a..8fd5e37c950 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -289,7 +289,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p cluster: executor.Id, pool: executor.Pool, queueName: job.Queue(), - priorityClass: job.GetPriorityClassName(), + priorityClass: job.PriorityClassName(), nodeType: node.ReportingNodeType, } addToResourceListMap(allocatedResourceByQueue, queueKey, schedulerobjects.ResourceListFromV1ResourceList(podRequirements.ResourceRequirements.Requests)) diff --git a/internal/scheduler/metrics/metrics.go b/internal/scheduler/metrics/metrics.go index d3a7b046db8..d4c31ef124c 100644 --- a/internal/scheduler/metrics/metrics.go +++ b/internal/scheduler/metrics/metrics.go @@ -279,7 +279,7 @@ func (m *Metrics) UpdateSucceeded(job *jobdb.Job) error { } func (m *Metrics) UpdateLeased(jctx *schedulercontext.JobSchedulingContext) error { - job := jctx.Job.(*jobdb.Job) + job := jctx.Job latestRun := job.LatestRun() duration, priorState := stateDuration(job, latestRun, &jctx.Created) labels := m.buffer[0:0] @@ -373,7 +373,7 @@ func (m *Metrics) indexOfFirstMatchingRegexFromErrorMessage(message string) (int func appendLabelsFromJob(labels []string, job *jobdb.Job) []string { executor, nodeName := executorAndNodeNameFromRun(job.LatestRun()) - labels = append(labels, job.GetQueue()) + labels = append(labels, job.Queue()) labels = append(labels, executor) labels = append(labels, "") // No nodeType. labels = append(labels, nodeName) @@ -381,9 +381,9 @@ func appendLabelsFromJob(labels []string, job *jobdb.Job) []string { } func appendLabelsFromJobSchedulingContext(labels []string, jctx *schedulercontext.JobSchedulingContext) []string { - job := jctx.Job.(*jobdb.Job) + job := jctx.Job executor, nodeName := executorAndNodeNameFromRun(job.LatestRun()) - labels = append(labels, job.GetQueue()) + labels = append(labels, job.Queue()) labels = append(labels, executor) wellKnownNodeType := "" if pctx := jctx.PodSchedulingContext; pctx != nil { @@ -441,7 +441,7 @@ func (m *Metrics) updateMetrics(labels []string, job *jobdb.Job, stateDuration t c.Add(stateDuration.Seconds()) } - requests := job.GetResourceRequirements().Requests + requests := job.ResourceRequirements().Requests for _, resource := range m.config.TrackedResourceNames { if r, ok := m.config.ResourceRenaming[resource]; ok { resource = v1.ResourceName(r) diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index a02dacf6172..64f81b242b6 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -21,12 +21,10 @@ import ( "github.com/armadaproject/armada/internal/common/util" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" koTaint "github.com/armadaproject/armada/internal/scheduler/kubernetesobjects/taint" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" - "github.com/armadaproject/armada/pkg/api" ) const ( @@ -121,36 +119,15 @@ func (nodeDb *NodeDb) copyMapWithIntern(labels map[string]string) map[string]str return result } -func (nodeDb *NodeDb) CreateAndInsertWithApiJobsWithTxn(txn *memdb.Txn, jobs []*api.Job, node *schedulerobjects.Node) error { - entry, err := nodeDb.create(node) - if err != nil { - return err - } - for _, job := range jobs { - priority, ok := job.GetScheduledAtPriority() - if !ok { - priorityClass := interfaces.PriorityClassFromLegacySchedulerJob(nodeDb.priorityClasses, nodeDb.defaultPriorityClass, job) - priority = priorityClass.Priority - } - if err := nodeDb.bindJobToNodeInPlace(entry, job, priority); err != nil { - return err - } - } - if err := nodeDb.UpsertWithTxn(txn, entry); err != nil { - return err - } - return nil -} - func (nodeDb *NodeDb) CreateAndInsertWithJobDbJobsWithTxn(txn *memdb.Txn, jobs []*jobdb.Job, node *schedulerobjects.Node) error { entry, err := nodeDb.create(node) if err != nil { return err } for _, job := range jobs { - priority, ok := job.GetScheduledAtPriority() + priority, ok := job.ScheduledAtPriority() if !ok { - priorityClass := interfaces.PriorityClassFromLegacySchedulerJob(nodeDb.priorityClasses, nodeDb.defaultPriorityClass, job) + priorityClass := job.PriorityClass() priority = priorityClass.Priority } if err := nodeDb.bindJobToNodeInPlace(entry, job, priority); err != nil { @@ -193,10 +170,6 @@ type NodeDb struct { // Because the number of database indices scales linearly with the number of distinct priorities, // the efficiency of the NodeDb relies on the number of distinct priorities being small. priorityClasses map[string]types.PriorityClass - // defaultPriorityClass is the name of the default priority class; it is - // used for jobs that specify a priority class that does not appear in - // priorityClasses, for example because it was deleted. - defaultPriorityClass string // Priority class priorities and NodeDb-internal priority, in increasing order. nodeDbPriorities []int32 // Resources, e.g., "cpu", "memory", and "nvidia.com/gpu", @@ -522,7 +495,7 @@ func deleteEvictedJobSchedulingContextIfExistsWithTxn(txn *memdb.Txn, jobId stri // SelectNodeForJobWithTxn selects a node on which the job can be scheduled. func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercontext.JobSchedulingContext) (*internaltypes.Node, error) { req := jctx.PodRequirements - priorityClass := interfaces.PriorityClassFromLegacySchedulerJob(nodeDb.priorityClasses, nodeDb.defaultPriorityClass, jctx.Job) + priorityClass := jctx.Job.PriorityClass() // If the job has already been scheduled, get the priority at which it was scheduled. // Otherwise, get the original priority the job was submitted with. @@ -907,7 +880,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *s } // bindJobToNode returns a copy of node with job bound to it. -func (nodeDb *NodeDb) bindJobToNode(node *internaltypes.Node, job interfaces.LegacySchedulerJob, priority int32) (*internaltypes.Node, error) { +func (nodeDb *NodeDb) bindJobToNode(node *internaltypes.Node, job *jobdb.Job, priority int32) (*internaltypes.Node, error) { node = node.UnsafeCopy() if err := nodeDb.bindJobToNodeInPlace(node, job, priority); err != nil { return nil, err @@ -916,9 +889,9 @@ func (nodeDb *NodeDb) bindJobToNode(node *internaltypes.Node, job interfaces.Leg } // bindJobToNodeInPlace is like bindJobToNode, but doesn't make a copy of node. -func (nodeDb *NodeDb) bindJobToNodeInPlace(node *internaltypes.Node, job interfaces.LegacySchedulerJob, priority int32) error { - jobId := job.GetId() - requests := job.GetResourceRequirements().Requests +func (nodeDb *NodeDb) bindJobToNodeInPlace(node *internaltypes.Node, job *jobdb.Job, priority int32) error { + jobId := job.Id() + requests := job.ResourceRequirements().Requests _, isEvicted := node.EvictedJobRunIds[jobId] delete(node.EvictedJobRunIds, jobId) @@ -937,7 +910,7 @@ func (nodeDb *NodeDb) bindJobToNodeInPlace(node *internaltypes.Node, job interfa if node.AllocatedByQueue == nil { node.AllocatedByQueue = make(map[string]schedulerobjects.ResourceList) } - queue := job.GetQueue() + queue := job.Queue() allocatedToQueue := node.AllocatedByQueue[queue] allocatedToQueue.AddV1ResourceList(requests) node.AllocatedByQueue[queue] = allocatedToQueue @@ -965,11 +938,11 @@ func (nodeDb *NodeDb) bindJobToNodeInPlace(node *internaltypes.Node, job interfa // AllocatedByQueue. func (nodeDb *NodeDb) EvictJobsFromNode( priorityClasses map[string]types.PriorityClass, - jobFilter func(interfaces.LegacySchedulerJob) bool, - jobs []interfaces.LegacySchedulerJob, + jobFilter func(*jobdb.Job) bool, + jobs []*jobdb.Job, node *internaltypes.Node, -) ([]interfaces.LegacySchedulerJob, *internaltypes.Node, error) { - evicted := make([]interfaces.LegacySchedulerJob, 0) +) ([]*jobdb.Job, *internaltypes.Node, error) { + evicted := make([]*jobdb.Job, 0) node = node.UnsafeCopy() for _, job := range jobs { if jobFilter != nil && !jobFilter(job) { @@ -984,13 +957,13 @@ func (nodeDb *NodeDb) EvictJobsFromNode( } // evictJobFromNodeInPlace is the in-place operation backing EvictJobsFromNode. -func (nodeDb *NodeDb) evictJobFromNodeInPlace(priorityClasses map[string]types.PriorityClass, job interfaces.LegacySchedulerJob, node *internaltypes.Node) error { - jobId := job.GetId() +func (nodeDb *NodeDb) evictJobFromNodeInPlace(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) error { + jobId := job.Id() if _, ok := node.AllocatedByJobId[jobId]; !ok { return errors.Errorf("job %s has no resources allocated on node %s", jobId, node.GetId()) } - queue := job.GetQueue() + queue := job.Queue() if _, ok := node.AllocatedByQueue[queue]; !ok { return errors.Errorf("queue %s has no resources allocated on node %s", queue, node.GetId()) } @@ -1008,7 +981,7 @@ func (nodeDb *NodeDb) evictJobFromNodeInPlace(priorityClasses map[string]types.P if !ok { return errors.Errorf("job %s not mapped to a priority", jobId) } - requests := job.GetResourceRequirements().Requests + requests := job.ResourceRequirements().Requests allocatable.MarkAllocatableV1ResourceList(priority, requests) allocatable.MarkAllocatedV1ResourceList(evictedPriority, requests) @@ -1016,7 +989,7 @@ func (nodeDb *NodeDb) evictJobFromNodeInPlace(priorityClasses map[string]types.P } // UnbindJobsFromNode returns a node with all elements of jobs unbound from it. -func (nodeDb *NodeDb) UnbindJobsFromNode(priorityClasses map[string]types.PriorityClass, jobs []interfaces.LegacySchedulerJob, node *internaltypes.Node) (*internaltypes.Node, error) { +func (nodeDb *NodeDb) UnbindJobsFromNode(priorityClasses map[string]types.PriorityClass, jobs []*jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) { node = node.UnsafeCopy() for _, job := range jobs { if err := nodeDb.unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil { @@ -1027,7 +1000,7 @@ func (nodeDb *NodeDb) UnbindJobsFromNode(priorityClasses map[string]types.Priori } // UnbindJobFromNode returns a copy of node with job unbound from it. -func (nodeDb *NodeDb) UnbindJobFromNode(priorityClasses map[string]types.PriorityClass, job interfaces.LegacySchedulerJob, node *internaltypes.Node) (*internaltypes.Node, error) { +func (nodeDb *NodeDb) UnbindJobFromNode(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) { node = node.UnsafeCopy() if err := nodeDb.unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil { return nil, err @@ -1036,9 +1009,9 @@ func (nodeDb *NodeDb) UnbindJobFromNode(priorityClasses map[string]types.Priorit } // unbindPodFromNodeInPlace is like UnbindJobFromNode, but doesn't make a copy of node. -func (nodeDb *NodeDb) unbindJobFromNodeInPlace(priorityClasses map[string]types.PriorityClass, job interfaces.LegacySchedulerJob, node *internaltypes.Node) error { - jobId := job.GetId() - requests := job.GetResourceRequirements().Requests +func (nodeDb *NodeDb) unbindJobFromNodeInPlace(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) error { + jobId := job.Id() + requests := job.ResourceRequirements().Requests _, isEvicted := node.EvictedJobRunIds[jobId] delete(node.EvictedJobRunIds, jobId) @@ -1050,7 +1023,7 @@ func (nodeDb *NodeDb) unbindJobFromNodeInPlace(priorityClasses map[string]types. delete(node.AllocatedByJobId, jobId) } - queue := job.GetQueue() + queue := job.Queue() if allocatedToQueue, ok := node.AllocatedByQueue[queue]; !ok { return errors.Errorf("queue %s has no resources allocated on node %s", queue, node.GetId()) } else { diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index a66eda3bbe9..286b56d6e57 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -10,14 +10,11 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "github.com/armadaproject/armada/internal/armada/configuration" armadamaps "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/common/stringinterner" - "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/common/util" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -127,10 +124,10 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { entry, err := nodeDb.GetNode(node.Id) require.NoError(t, err) - jobFilter := func(job interfaces.LegacySchedulerJob) bool { return true } + jobFilter := func(job *jobdb.Job) bool { return true } job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0) - request := schedulerobjects.ResourceListFromV1ResourceList(job.GetResourceRequirements().Requests) - jobId := job.GetId() + request := schedulerobjects.ResourceListFromV1ResourceList(job.ResourceRequirements().Requests) + jobId := job.Id() boundNode, err := nodeDb.bindJobToNode(entry, job, job.PodRequirements().Priority) require.NoError(t, err) @@ -138,12 +135,12 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { unboundNode, err := nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, boundNode) require.NoError(t, err) - unboundMultipleNode, err := nodeDb.UnbindJobsFromNode(testfixtures.TestPriorityClasses, []interfaces.LegacySchedulerJob{job}, boundNode) + unboundMultipleNode, err := nodeDb.UnbindJobsFromNode(testfixtures.TestPriorityClasses, []*jobdb.Job{job}, boundNode) require.NoError(t, err) - evictedJobs, evictedNode, err := nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, boundNode) + evictedJobs, evictedNode, err := nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, boundNode) require.NoError(t, err) - assert.Equal(t, []interfaces.LegacySchedulerJob{job}, evictedJobs) + assert.Equal(t, []*jobdb.Job{job}, evictedJobs) evictedUnboundNode, err := nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode) require.NoError(t, err) @@ -151,7 +148,7 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { evictedBoundNode, err := nodeDb.bindJobToNode(evictedNode, job, job.PodRequirements().Priority) require.NoError(t, err) - _, _, err = nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, entry) + _, _, err = nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, entry) require.Error(t, err) _, err = nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, entry) @@ -160,7 +157,7 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { _, err = nodeDb.bindJobToNode(boundNode, job, job.PodRequirements().Priority) require.Error(t, err) - _, _, err = nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, evictedNode) + _, _, err = nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, evictedNode) require.Error(t, err) assertNodeAccountingEqual(t, entry, unboundNode) @@ -201,7 +198,7 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { expectedAllocatable := boundNode.TotalResources.DeepCopy() expectedAllocatable.Sub(request) - priority := testfixtures.TestPriorityClasses[job.GetPriorityClassName()].Priority + priority := testfixtures.TestPriorityClasses[job.PriorityClassName()].Priority assert.True(t, expectedAllocatable.Equal(boundNode.AllocatableByPriority[priority])) assert.Empty(t, unboundNode.AllocatedByJobId) @@ -253,20 +250,20 @@ func assertNodeAccountingEqual(t *testing.T, node1, node2 *internaltypes.Node) { func TestEviction(t *testing.T) { tests := map[string]struct { - jobFilter func(interfaces.LegacySchedulerJob) bool + jobFilter func(*jobdb.Job) bool expectedEvictions []int32 }{ "jobFilter always returns false": { - jobFilter: func(_ interfaces.LegacySchedulerJob) bool { return false }, + jobFilter: func(_ *jobdb.Job) bool { return false }, expectedEvictions: []int32{}, }, "jobFilter always returns true": { - jobFilter: func(_ interfaces.LegacySchedulerJob) bool { return true }, + jobFilter: func(_ *jobdb.Job) bool { return true }, expectedEvictions: []int32{0, 1}, }, "jobFilter returns true for preemptible jobs": { - jobFilter: func(job interfaces.LegacySchedulerJob) bool { - priorityClassName := job.GetPriorityClassName() + jobFilter: func(job *jobdb.Job) bool { + priorityClassName := job.PriorityClassName() priorityClass := testfixtures.TestPriorityClasses[priorityClassName] return priorityClass.Preemptible }, @@ -293,13 +290,13 @@ func TestEviction(t *testing.T) { entry, err := nodeDb.GetNode(node.Id) require.NoError(t, err) - existingJobs := make([]interfaces.LegacySchedulerJob, len(jobs)) + existingJobs := make([]*jobdb.Job, len(jobs)) for i, job := range jobs { existingJobs[i] = job } actualEvictions, _, err := nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, tc.jobFilter, existingJobs, entry) require.NoError(t, err) - expectedEvictions := make([]interfaces.LegacySchedulerJob, 0, len(tc.expectedEvictions)) + expectedEvictions := make([]*jobdb.Job, 0, len(tc.expectedEvictions)) for _, i := range tc.expectedEvictions { expectedEvictions = append(expectedEvictions, jobs[i]) } @@ -467,8 +464,8 @@ func TestScheduleIndividually(t *testing.T) { node, err := nodeDb.GetNode(nodeId) require.NoError(t, err) require.NotNil(t, node) - expected := schedulerobjects.ResourceListFromV1ResourceList(job.GetResourceRequirements().Requests) - actual, ok := node.AllocatedByJobId[job.GetId()] + expected := schedulerobjects.ResourceListFromV1ResourceList(job.ResourceRequirements().Requests) + actual, ok := node.AllocatedByJobId[job.Id()] require.True(t, ok) assert.True(t, actual.Equal(expected)) } @@ -562,35 +559,13 @@ func TestScheduleMany(t *testing.T) { } func TestAwayNodeTypes(t *testing.T) { - priorityClasses := map[string]types.PriorityClass{ - "armada-preemptible-away": { - Priority: 30000, - Preemptible: true, - - AwayNodeTypes: []types.AwayNodeType{ - {Priority: 29000, WellKnownNodeTypeName: "whale"}, - }, - }, - } - nodeDb, err := NewNodeDb( - priorityClasses, + testfixtures.TestPriorityClasses, testfixtures.TestMaxExtraNodesToConsider, testfixtures.TestResources, testfixtures.TestIndexedTaints, testfixtures.TestIndexedNodeLabels, - []configuration.WellKnownNodeType{ - { - Name: "whale", - Taints: []v1.Taint{ - { - Key: "whale", - Value: "true", - Effect: v1.TaintEffectNoSchedule, - }, - }, - }, - }, + testfixtures.TestWellKnownNodeTypes, stringinterner.New(1024), ) require.NoError(t, err) @@ -600,7 +575,7 @@ func TestAwayNodeTypes(t *testing.T) { node.Taints = append( node.Taints, v1.Taint{ - Key: "whale", + Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule, }, @@ -614,7 +589,7 @@ func TestAwayNodeTypes(t *testing.T) { "armada-preemptible-away", testfixtures.Test1Cpu4GiPodReqs(testfixtures.TestQueue, jobId, 30000), ) - jctx := schedulercontext.JobSchedulingContextFromJob(priorityClasses, job) + jctx := schedulercontext.JobSchedulingContextFromJob(job) require.Empty(t, jctx.AdditionalTolerations) gctx := schedulercontext.NewGangSchedulingContext([]*schedulercontext.JobSchedulingContext{jctx}) @@ -625,7 +600,7 @@ func TestAwayNodeTypes(t *testing.T) { t, []v1.Toleration{ { - Key: "whale", + Key: "gpu", Value: "true", Effect: v1.TaintEffectNoSchedule, }, diff --git a/internal/scheduler/pool_assigner.go b/internal/scheduler/pool_assigner.go index 556d6eea246..5ed663d7abd 100644 --- a/internal/scheduler/pool_assigner.go +++ b/internal/scheduler/pool_assigner.go @@ -15,7 +15,6 @@ import ( "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -110,9 +109,9 @@ func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error) { } // See if we have this set of reqs cached. - schedulingKey, ok := j.GetSchedulingKey() + schedulingKey, ok := j.SchedulingKey() if !ok { - schedulingKey = interfaces.SchedulingKeyFromLegacySchedulerJob(p.schedulingKeyGenerator, j) + schedulingKey = jobdb.SchedulingKeyFromJob(p.schedulingKeyGenerator, j) } if cachedPool, ok := p.poolCache.Get(schedulingKey); ok { return cachedPool.(string), nil @@ -133,9 +132,9 @@ func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error) { txn := nodeDb.Txn(true) jctx := &schedulercontext.JobSchedulingContext{ Created: time.Now(), - JobId: j.GetId(), + JobId: j.Id(), Job: j, - PodRequirements: j.GetPodRequirements(p.priorityClasses), + PodRequirements: j.PodRequirements(), GangInfo: schedulercontext.EmptyGangInfo(j), } node, err := nodeDb.SelectNodeForJobWithTxn(txn, jctx) diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 1db502dc080..b7c7b46f8f9 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -19,7 +19,6 @@ import ( schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/fairness" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" @@ -123,20 +122,20 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche sch.nodeDb, sch.schedulingContext.PriorityClasses, sch.nodeEvictionProbability, - func(ctx *armadacontext.Context, job interfaces.LegacySchedulerJob) bool { - priorityClass := interfaces.PriorityClassFromLegacySchedulerJob(sch.schedulingContext.PriorityClasses, sch.schedulingContext.DefaultPriorityClass, job) + func(ctx *armadacontext.Context, job *jobdb.Job) bool { + priorityClass := job.PriorityClass() if !priorityClass.Preemptible { return false } - if job.GetAnnotations() == nil { - ctx.Errorf("can't evict job %s: annotations not initialised", job.GetId()) + if job.Annotations() == nil { + ctx.Errorf("can't evict job %s: annotations not initialised", job.Id()) return false } - if job.GetNodeSelector() == nil { - ctx.Errorf("can't evict job %s: nodeSelector not initialised", job.GetId()) + if job.NodeSelector() == nil { + ctx.Errorf("can't evict job %s: nodeSelector not initialised", job.Id()) return false } - if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.GetQueue()]; ok { + if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.Queue()]; ok { fairShare := qctx.Weight / sch.schedulingContext.WeightSum actualShare := sch.schedulingContext.FairnessCostProvider.CostFromQueue(qctx) / totalCost fractionOfFairShare := actualShare / fairShare @@ -187,7 +186,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche sch.jobRepo, sch.nodeDb, sch.schedulingContext.PriorityClasses, - sch.schedulingContext.DefaultPriorityClass, sch.nodeOversubscriptionEvictionProbability, nil, ), @@ -411,7 +409,7 @@ func (sch *PreemptingQueueScheduler) collectIdsForGangEviction(evictorResult *Ev // Otherwise, the evicted gang jobs will not be schedulable, since some gang jobs will be considered missing. func (sch *PreemptingQueueScheduler) setEvictedGangCardinality(evictorResult *EvictorResult) { for _, jctx := range evictorResult.EvictedJctxsByJobId { - gangId, ok := sch.gangIdByJobId[jctx.Job.GetId()] + gangId, ok := sch.gangIdByJobId[jctx.Job.Id()] if !ok { // Not a gang job. continue @@ -581,7 +579,7 @@ func (sch *PreemptingQueueScheduler) unbindJobs(jctxs []*schedulercontext.JobSch func(jctx *schedulercontext.JobSchedulingContext) string { return sch.nodeIdByJobId[jctx.JobId] }, - func(jcxt *schedulercontext.JobSchedulingContext) interfaces.LegacySchedulerJob { + func(jcxt *schedulercontext.JobSchedulingContext) *jobdb.Job { return jcxt.Job }, ) { @@ -603,8 +601,8 @@ func (sch *PreemptingQueueScheduler) unbindJobs(jctxs []*schedulercontext.JobSch // Update sch.gangIdByJobId and sch.jobIdsByGangId based on preempted/scheduled jobs. func (sch *PreemptingQueueScheduler) updateGangAccounting(preempted []*schedulercontext.JobSchedulingContext, scheduled []*schedulercontext.JobSchedulingContext) error { for _, jctx := range preempted { - if gangId, ok := sch.gangIdByJobId[jctx.Job.GetId()]; ok { - delete(sch.gangIdByJobId, jctx.Job.GetId()) + if gangId, ok := sch.gangIdByJobId[jctx.Job.Id()]; ok { + delete(sch.gangIdByJobId, jctx.Job.Id()) delete(sch.jobIdsByGangId, gangId) } } @@ -698,7 +696,7 @@ type Evictor struct { nodeDb *nodedb.NodeDb priorityClasses map[string]types.PriorityClass nodeFilter func(*armadacontext.Context, *internaltypes.Node) bool - jobFilter func(*armadacontext.Context, interfaces.LegacySchedulerJob) bool + jobFilter func(*armadacontext.Context, *jobdb.Job) bool } type EvictorResult struct { @@ -715,7 +713,7 @@ func NewNodeEvictor( nodeDb *nodedb.NodeDb, priorityClasses map[string]types.PriorityClass, perNodeEvictionProbability float64, - jobFilter func(*armadacontext.Context, interfaces.LegacySchedulerJob) bool, + jobFilter func(*armadacontext.Context, *jobdb.Job) bool, random *rand.Rand, ) *Evictor { if perNodeEvictionProbability <= 0 { @@ -755,8 +753,8 @@ func NewFilteredEvictor( shouldEvict := nodeIdsToEvict[node.GetId()] return shouldEvict }, - jobFilter: func(_ *armadacontext.Context, job interfaces.LegacySchedulerJob) bool { - shouldEvict := jobIdsToEvict[job.GetId()] + jobFilter: func(_ *armadacontext.Context, job *jobdb.Job) bool { + shouldEvict := jobIdsToEvict[job.Id()] return shouldEvict }, } @@ -769,7 +767,6 @@ func NewOversubscribedEvictor( jobRepo JobRepository, nodeDb *nodedb.NodeDb, priorityClasses map[string]types.PriorityClass, - defaultPriorityClassName string, perNodeEvictionProbability float64, random *rand.Rand, ) *Evictor { @@ -803,14 +800,14 @@ func NewOversubscribedEvictor( } return len(overSubscribedPriorities) > 0 && random.Float64() < perNodeEvictionProbability }, - jobFilter: func(ctx *armadacontext.Context, job interfaces.LegacySchedulerJob) bool { - priorityClass := interfaces.PriorityClassFromLegacySchedulerJob(priorityClasses, defaultPriorityClassName, job) + jobFilter: func(ctx *armadacontext.Context, job *jobdb.Job) bool { + priorityClass := job.PriorityClass() if !priorityClass.Preemptible { return false } - priority, ok := nodeDb.GetScheduledAtPriority(job.GetId()) + priority, ok := nodeDb.GetScheduledAtPriority(job.Id()) if !ok { - ctx.Warnf("can't evict job %s: not mapped to a priority", job.GetId()) + ctx.Warnf("can't evict job %s: not mapped to a priority", job.Id()) return false } return overSubscribedPriorities[priority] @@ -823,9 +820,9 @@ func NewOversubscribedEvictor( // Any job for which jobFilter returns true is evicted (if the node was not skipped). // If a job was evicted from a node, postEvictFunc is called with the corresponding job and node. func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*EvictorResult, error) { - var jobFilter func(job interfaces.LegacySchedulerJob) bool + var jobFilter func(job *jobdb.Job) bool if evi.jobFilter != nil { - jobFilter = func(job interfaces.LegacySchedulerJob) bool { return evi.jobFilter(ctx, job) } + jobFilter = func(job *jobdb.Job) bool { return evi.jobFilter(ctx, job) } } evictedJctxsByJobId := make(map[string]*schedulercontext.JobSchedulingContext) affectedNodesById := make(map[string]*internaltypes.Node) @@ -854,9 +851,7 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev // TODO: Should be safe to remove now. for i, evictedJob := range evictedJobs { - if dbJob, ok := evictedJob.(*jobdb.Job); ok { - evictedJobs[i] = dbJob.DeepCopy() - } + evictedJobs[i] = evictedJob.DeepCopy() } for _, job := range evictedJobs { @@ -867,13 +862,13 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev // - Adding taints to a node doesn't cause jobs already running on the node to be preempted. // - Jobs scheduled as away jobs have the necessary tolerations to be re-scheduled. // TODO(albin): We can remove the checkOnlyDynamicRequirements flag in the nodeDb now that we've added the tolerations. - jctx := schedulercontext.JobSchedulingContextFromJob(evi.priorityClasses, job) + jctx := schedulercontext.JobSchedulingContextFromJob(job) jctx.IsEvicted = true jctx.AddNodeSelector(schedulerconfig.NodeIdLabel, node.GetId()) - evictedJctxsByJobId[job.GetId()] = jctx + evictedJctxsByJobId[job.Id()] = jctx jctx.AdditionalTolerations = append(jctx.AdditionalTolerations, node.GetTolerationsForTaints()...) - nodeIdByJobId[job.GetId()] = node.GetId() + nodeIdByJobId[job.Id()] = node.GetId() } if len(evictedJobs) > 0 { affectedNodesById[node.GetId()] = node diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 69ce43cc86a..325f32b02d9 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -60,7 +60,6 @@ func TestEvictOversubscribed(t *testing.T) { NewSchedulerJobRepositoryAdapter(jobDbTxn), nodeDb, config.PriorityClasses, - config.DefaultPriorityClassName, 1, nil, ) @@ -1485,18 +1484,6 @@ func TestPreemptingQueueScheduler(t *testing.T) { "home-away preemption, away jobs first": { SchedulingConfig: func() configuration.SchedulingConfig { config := testfixtures.TestSchedulingConfig() - config.PriorityClasses = map[string]types.PriorityClass{ - "armada-preemptible-away": { - Priority: 30000, - Preemptible: true, - - AwayNodeTypes: []types.AwayNodeType{{Priority: 29000, WellKnownNodeTypeName: "gpu"}}, - }, - "armada-preemptible": { - Priority: 30000, - Preemptible: true, - }, - } config.DefaultPriorityClassName = "armada-preemptible" config.WellKnownNodeTypes = []configuration.WellKnownNodeType{ { @@ -1800,10 +1787,10 @@ func TestPreemptingQueueScheduler(t *testing.T) { for queue, jobs := range round.JobsByQueue { for j, job := range jobs { job = job.WithQueued(true) - require.Equal(t, queue, job.GetQueue()) + require.Equal(t, queue, job.Queue()) queuedJobs = append(queuedJobs, job.WithQueued(true)) - roundByJobId[job.GetId()] = i - indexByJobId[job.GetId()] = j + roundByJobId[job.Id()] = i + indexByJobId[job.Id()] = j } } err = jobDbTxn.Upsert(queuedJobs) @@ -1814,16 +1801,16 @@ func TestPreemptingQueueScheduler(t *testing.T) { for roundIndex, reqIndices := range reqIndicesByRoundIndex { for _, reqIndex := range reqIndices { job := tc.Rounds[roundIndex].JobsByQueue[queue][reqIndex] - nodeId := nodeIdByJobId[job.GetId()] + nodeId := nodeIdByJobId[job.Id()] node, err := nodeDb.GetNode(nodeId) require.NoError(t, err) node, err = nodeDb.UnbindJobFromNode(tc.SchedulingConfig.PriorityClasses, job, node) require.NoError(t, err) err = nodeDb.Upsert(node) require.NoError(t, err) - if gangId, ok := gangIdByJobId[job.GetId()]; ok { - delete(gangIdByJobId, job.GetId()) - delete(jobIdsByGangId[gangId], job.GetId()) + if gangId, ok := gangIdByJobId[job.Id()]; ok { + delete(gangIdByJobId, job.Id()) + delete(jobIdsByGangId[gangId], job.Id()) } } } @@ -1899,26 +1886,26 @@ func TestPreemptingQueueScheduler(t *testing.T) { // Test resource accounting. for _, jctx := range result.PreemptedJobs { job := jctx.Job - m := allocatedByQueueAndPriorityClass[job.GetQueue()] + m := allocatedByQueueAndPriorityClass[job.Queue()] if m == nil { m = make(schedulerobjects.QuantityByTAndResourceType[string]) - allocatedByQueueAndPriorityClass[job.GetQueue()] = m + allocatedByQueueAndPriorityClass[job.Queue()] = m } m.SubV1ResourceList( - job.GetPriorityClassName(), - job.GetResourceRequirements().Requests, + job.PriorityClassName(), + job.ResourceRequirements().Requests, ) } for _, jctx := range result.ScheduledJobs { job := jctx.Job - m := allocatedByQueueAndPriorityClass[job.GetQueue()] + m := allocatedByQueueAndPriorityClass[job.Queue()] if m == nil { m = make(schedulerobjects.QuantityByTAndResourceType[string]) - allocatedByQueueAndPriorityClass[job.GetQueue()] = m + allocatedByQueueAndPriorityClass[job.Queue()] = m } m.AddV1ResourceList( - job.GetPriorityClassName(), - job.GetResourceRequirements().Requests, + job.PriorityClassName(), + job.ResourceRequirements().Requests, ) } for queue, qctx := range sctx.QueueSchedulingContexts { @@ -1928,17 +1915,17 @@ func TestPreemptingQueueScheduler(t *testing.T) { // Test that jobs are mapped to nodes correctly. for _, jctx := range result.PreemptedJobs { job := jctx.Job - nodeId, ok := result.NodeIdByJobId[job.GetId()] + nodeId, ok := result.NodeIdByJobId[job.Id()] assert.True(t, ok) assert.NotEmpty(t, nodeId) // Check that preempted jobs are preempted from the node they were previously scheduled onto. - expectedNodeId := nodeIdByJobId[job.GetId()] - assert.Equal(t, expectedNodeId, nodeId, "job %s preempted from unexpected node", job.GetId()) + expectedNodeId := nodeIdByJobId[job.Id()] + assert.Equal(t, expectedNodeId, nodeId, "job %s preempted from unexpected node", job.Id()) } for _, jctx := range result.ScheduledJobs { job := jctx.Job - nodeId, ok := result.NodeIdByJobId[job.GetId()] + nodeId, ok := result.NodeIdByJobId[job.Id()] assert.True(t, ok) assert.NotEmpty(t, nodeId) @@ -1954,10 +1941,10 @@ func TestPreemptingQueueScheduler(t *testing.T) { // Check that scheduled jobs are consistently assigned to the same node. // (We don't allow moving jobs between nodes.) - if expectedNodeId, ok := nodeIdByJobId[job.GetId()]; ok { - assert.Equal(t, expectedNodeId, nodeId, "job %s scheduled onto unexpected node", job.GetId()) + if expectedNodeId, ok := nodeIdByJobId[job.Id()]; ok { + assert.Equal(t, expectedNodeId, nodeId, "job %s scheduled onto unexpected node", job.Id()) } else { - nodeIdByJobId[job.GetId()] = nodeId + nodeIdByJobId[job.Id()] = nodeId } } for jobId, nodeId := range result.NodeIdByJobId { @@ -2015,12 +2002,12 @@ func TestPreemptingQueueScheduler(t *testing.T) { } } - err = jobDbTxn.BatchDelete(util.Map(queuedJobs, func(job *jobdb.Job) string { return job.GetId() })) + err = jobDbTxn.BatchDelete(util.Map(queuedJobs, func(job *jobdb.Job) string { return job.Id() })) require.NoError(t, err) var preemptedJobs []*jobdb.Job for _, jctx := range result.PreemptedJobs { - job := jctx.Job.(*jobdb.Job) + job := jctx.Job preemptedJobs = append( preemptedJobs, job. @@ -2038,9 +2025,9 @@ func TestPreemptingQueueScheduler(t *testing.T) { slices.SortFunc( result.ScheduledJobs, func(a, b *schedulercontext.JobSchedulingContext) int { - if a.Job.GetSubmitTime().Before(b.Job.GetSubmitTime()) { + if a.Job.SubmitTime().Before(b.Job.SubmitTime()) { return -1 - } else if b.Job.GetSubmitTime().Before(a.Job.GetSubmitTime()) { + } else if b.Job.SubmitTime().Before(a.Job.SubmitTime()) { return 1 } else { return 0 @@ -2049,8 +2036,8 @@ func TestPreemptingQueueScheduler(t *testing.T) { ) var scheduledJobs []*jobdb.Job for _, jctx := range result.ScheduledJobs { - job := jctx.Job.(*jobdb.Job) - jobId := job.GetId() + job := jctx.Job + jobId := job.Id() node, err := nodeDb.GetNode(result.NodeIdByJobId[jobId]) require.NotNil(t, node) require.NoError(t, err) @@ -2074,7 +2061,7 @@ func jobIdsByQueueFromJobContexts(jctxs []*schedulercontext.JobSchedulingContext rv := make(map[string][]string) for _, jctx := range jctxs { job := jctx.Job - rv[job.GetQueue()] = append(rv[job.GetQueue()], job.GetId()) + rv[job.Queue()] = append(rv[job.Queue()], job.Id()) } return rv } @@ -2265,8 +2252,8 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { require.NoError(b, err) jobsByNodeId := make(map[string][]*jobdb.Job) - for _, job := range ScheduledJobsFromSchedulerResult[*jobdb.Job](result) { - nodeId := result.NodeIdByJobId[job.GetId()] + for _, job := range ScheduledJobsFromSchedulerResult(result) { + nodeId := result.NodeIdByJobId[job.Id()] jobsByNodeId[nodeId] = append(jobsByNodeId[nodeId], job) } nodeDb, err = NewNodeDb(tc.SchedulingConfig, stringinterner.New(1024)) diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 458008e7688..d036550d66d 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -22,7 +22,6 @@ import ( schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/fairness" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -536,12 +535,12 @@ func TestQueueScheduler(t *testing.T) { indexByJobId := make(map[string]int) for i, job := range tc.Jobs { - if _, ok := queueNameToQueue[job.GetQueue()]; !ok { + if _, ok := queueNameToQueue[job.Queue()]; !ok { panic(fmt.Sprintf("queue %s does not exist", job.Queue())) } - indexByJobId[job.GetId()] = i + indexByJobId[job.Id()] = i } - legacySchedulerJobs := make([]interfaces.LegacySchedulerJob, len(tc.Jobs)) + legacySchedulerJobs := make([]*jobdb.Job, len(tc.Jobs)) for i, job := range tc.Jobs { legacySchedulerJobs[i] = job } @@ -615,7 +614,7 @@ func TestQueueScheduler(t *testing.T) { expectedScheduledIndicesByQueue := armadaslices.GroupByFunc( tc.ExpectedScheduledIndices, func(i int) string { - return tc.Jobs[i].GetQueue() + return tc.Jobs[i].Queue() }, ) expectedSuccessfulOrNotAttemptedIndices := armadaslices.MapAndGroupByFuncs( @@ -632,7 +631,7 @@ func TestQueueScheduler(t *testing.T) { expectedUnsuccessfulIndicesByQueue := armadaslices.GroupByFunc( expectedUnsuccessfulIndices, func(i int) string { - return tc.Jobs[i].GetQueue() + return tc.Jobs[i].Queue() }, ) actualSuccessfulIndicesByQueue := make(map[string][]int) diff --git a/internal/scheduler/result.go b/internal/scheduler/result.go index 426d7790a27..2ac4bddb32f 100644 --- a/internal/scheduler/result.go +++ b/internal/scheduler/result.go @@ -2,7 +2,7 @@ package scheduler import ( schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" - "github.com/armadaproject/armada/internal/scheduler/interfaces" + "github.com/armadaproject/armada/internal/scheduler/jobdb" ) // SchedulerResult is returned by Rescheduler.Schedule(). @@ -26,29 +26,29 @@ type SchedulerResult struct { AdditionalAnnotationsByJobId map[string]map[string]string } -// PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result cast to type T. -func PreemptedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T { - rv := make([]T, len(sr.PreemptedJobs)) +// PreemptedJobsFromSchedulerResult returns the slice of preempted jobs in the result. +func PreemptedJobsFromSchedulerResult(sr *SchedulerResult) []*jobdb.Job { + rv := make([]*jobdb.Job, len(sr.PreemptedJobs)) for i, jctx := range sr.PreemptedJobs { - rv[i] = jctx.Job.(T) + rv[i] = jctx.Job } return rv } -// ScheduledJobsFromSchedulerResult returns the slice of scheduled jobs in the result cast to type T. -func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T { - rv := make([]T, len(sr.ScheduledJobs)) +// ScheduledJobsFromSchedulerResult returns the slice of scheduled jobs in the result. +func ScheduledJobsFromSchedulerResult(sr *SchedulerResult) []*jobdb.Job { + rv := make([]*jobdb.Job, len(sr.ScheduledJobs)) for i, jctx := range sr.ScheduledJobs { - rv[i] = jctx.Job.(T) + rv[i] = jctx.Job } return rv } -// FailedJobsFromSchedulerResult returns the slice of scheduled jobs in the result cast to type T. -func FailedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T { - rv := make([]T, len(sr.FailedJobs)) +// FailedJobsFromSchedulerResult returns the slice of scheduled jobs in the result. +func FailedJobsFromSchedulerResult(sr *SchedulerResult) []*jobdb.Job { + rv := make([]*jobdb.Job, len(sr.FailedJobs)) for i, jctx := range sr.FailedJobs { - rv[i] = jctx.Job.(T) + rv[i] = jctx.Job } return rv } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index c673d24a51f..eeb68939c59 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -304,10 +304,10 @@ func (s *Scheduler) cycle(ctx *armadacontext.Context, updateAll bool, leaderToke t = time.Now() } if jst.Failed { - s.failureEstimator.Push(run.NodeName(), jst.Job.GetQueue(), run.Executor(), false, t) + s.failureEstimator.Push(run.NodeName(), jst.Job.Queue(), run.Executor(), false, t) } if jst.Succeeded { - s.failureEstimator.Push(run.NodeName(), jst.Job.GetQueue(), run.Executor(), true, t) + s.failureEstimator.Push(run.NodeName(), jst.Job.Queue(), run.Executor(), true, t) } } s.failureEstimator.Update() @@ -398,7 +398,7 @@ func (s *Scheduler) updateMetricsFromSchedulerResult(ctx *armadacontext.Context, } } for _, jctx := range overallSchedulerResult.FailedJobs { - if err := s.schedulerMetrics.UpdateFailed(ctx, jctx.Job.(*jobdb.Job), nil); err != nil { + if err := s.schedulerMetrics.UpdateFailed(ctx, jctx.Job, nil); err != nil { return err } } @@ -466,7 +466,7 @@ func (s *Scheduler) createSchedulingInfoWithNodeAntiAffinityForAttemptedRuns(job newSchedulingInfo.Version = job.JobSchedulingInfo().Version + 1 podRequirements := newSchedulingInfo.GetPodRequirements() if podRequirements == nil { - return nil, errors.Errorf("no pod scheduling requirement found for job %s", job.GetId()) + return nil, errors.Errorf("no pod scheduling requirement found for job %s", job.Id()) } newAffinity := podRequirements.Affinity if newAffinity == nil { @@ -503,7 +503,7 @@ func (s *Scheduler) eventsFromSchedulerResult(result *SchedulerResult) ([]*armad // EventsFromSchedulerResult generates necessary EventSequences from the provided SchedulerResult. func EventsFromSchedulerResult(result *SchedulerResult, time time.Time) ([]*armadaevents.EventSequence, error) { eventSequences := make([]*armadaevents.EventSequence, 0, len(result.PreemptedJobs)+len(result.ScheduledJobs)+len(result.FailedJobs)) - eventSequences, err := AppendEventSequencesFromPreemptedJobs(eventSequences, PreemptedJobsFromSchedulerResult[*jobdb.Job](result), time) + eventSequences, err := AppendEventSequencesFromPreemptedJobs(eventSequences, PreemptedJobsFromSchedulerResult(result), time) if err != nil { return nil, err } @@ -511,7 +511,7 @@ func EventsFromSchedulerResult(result *SchedulerResult, time time.Time) ([]*arma if err != nil { return nil, err } - eventSequences, err = AppendEventSequencesFromUnschedulableJobs(eventSequences, FailedJobsFromSchedulerResult[*jobdb.Job](result), time) + eventSequences, err = AppendEventSequencesFromUnschedulableJobs(eventSequences, FailedJobsFromSchedulerResult(result), time) if err != nil { return nil, err } @@ -586,7 +586,7 @@ func createEventsForPreemptedJob(jobId *armadaevents.Uuid, runId *armadaevents.U func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventSequence, jctxs []*schedulercontext.JobSchedulingContext, additionalAnnotationsByJobId map[string]map[string]string) ([]*armadaevents.EventSequence, error) { for _, jctx := range jctxs { - job := jctx.Job.(*jobdb.Job) + job := jctx.Job jobId, err := armadaevents.ProtoUuidFromUlidString(job.Id()) if err != nil { return nil, err @@ -596,7 +596,7 @@ func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventS return nil, errors.Errorf("attempting to generate lease eventSequences for job %s with no associated runs", job.Id()) } runCreationTime := time.Unix(0, job.ActiveRunTimestamp()) - scheduledAtPriority, hasScheduledAtPriority := job.GetScheduledAtPriority() + scheduledAtPriority, hasScheduledAtPriority := job.ScheduledAtPriority() eventSequences = append(eventSequences, &armadaevents.EventSequence{ Queue: job.Queue(), JobSetName: job.Jobset(), // TODO: Rename to JobSet. @@ -630,7 +630,7 @@ func AppendEventSequencesFromScheduledJobs(eventSequences []*armadaevents.EventS func AppendEventSequencesFromUnschedulableJobs(eventSequences []*armadaevents.EventSequence, jobs []*jobdb.Job, time time.Time) ([]*armadaevents.EventSequence, error) { for _, job := range jobs { - jobId, err := armadaevents.ProtoUuidFromUlidString(job.GetId()) + jobId, err := armadaevents.ProtoUuidFromUlidString(job.Id()) if err != nil { return nil, err } @@ -639,8 +639,8 @@ func AppendEventSequencesFromUnschedulableJobs(eventSequences []*armadaevents.Ev Reason: &armadaevents.Error_GangJobUnschedulable{GangJobUnschedulable: &armadaevents.GangJobUnschedulable{Message: "Job did not meet the minimum gang cardinality"}}, } eventSequences = append(eventSequences, &armadaevents.EventSequence{ - Queue: job.GetQueue(), - JobSetName: job.GetJobSet(), + Queue: job.Queue(), + JobSetName: job.Jobset(), Events: []*armadaevents.EventSequence_Event{ { Created: &time, @@ -772,13 +772,13 @@ func (s *Scheduler) generateUpdateMessagesFromJob(job *jobdb.Job, jobRunErrors m } events = append(events, jobSucceeded) } else if lastRun.Failed() && !job.Queued() { - failFast := job.GetAnnotations()[configuration.FailFastAnnotation] == "true" + failFast := job.Annotations()[configuration.FailFastAnnotation] == "true" requeueJob := !failFast && lastRun.Returned() && job.NumAttempts() < s.maxAttemptedRuns if requeueJob && lastRun.RunAttempted() { jobWithAntiAffinity, schedulable, err := s.addNodeAntiAffinitiesForAttemptedRunsIfSchedulable(job) if err != nil { - return nil, errors.Errorf("unable to set node anti-affinity for job %s because %s", job.GetId(), err) + return nil, errors.Errorf("unable to set node anti-affinity for job %s because %s", job.Id(), err) } else { if schedulable { job = jobWithAntiAffinity @@ -850,7 +850,7 @@ func (s *Scheduler) generateUpdateMessagesFromJob(job *jobdb.Job, jobRunErrors m events = append(events, jobErrors) } - } else if lastRun.PreemptRequested() && job.GetPriorityClass().Preemptible { + } else if lastRun.PreemptRequested() && job.PriorityClass().Preemptible { job = job.WithQueued(false).WithFailed(true).WithUpdatedRun(lastRun.WithoutTerminal().WithFailed(true)) events = append(events, createEventsForPreemptedJob(jobId, armadaevents.ProtoUuidFromUuid(lastRun.Id()), s.clock.Now())...) } @@ -881,7 +881,7 @@ func (s *Scheduler) expireJobsIfNecessary(ctx *armadacontext.Context, txn *jobdb if err != nil { return nil, err } - staleExecutors := make(map[string]bool, 0) + staleExecutors := make(map[string]bool) cutOff := s.clock.Now().Add(-s.executorTimeout) jobsToUpdate := make([]*jobdb.Job, 0) diff --git a/internal/scheduler/scheduler_metrics.go b/internal/scheduler/scheduler_metrics.go index 844defb83f1..bc81d4c92c2 100644 --- a/internal/scheduler/scheduler_metrics.go +++ b/internal/scheduler/scheduler_metrics.go @@ -165,7 +165,7 @@ func aggregateJobContexts(previousSchedulingRoundData map[queuePriorityClassKey] for _, jctx := range jctxs { job := jctx.Job - key := queuePriorityClassKey{queue: job.GetQueue(), priorityClass: job.GetPriorityClassName()} + key := queuePriorityClassKey{queue: job.Queue(), priorityClass: job.PriorityClassName()} result[key] += 1 } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 908a6ab0988..2d0106aa5db 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -24,7 +24,6 @@ import ( schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" schedulerdb "github.com/armadaproject/armada/internal/scheduler/database" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/kubernetesobjects/affinity" "github.com/armadaproject/armada/internal/scheduler/leader" @@ -1221,7 +1220,7 @@ func TestScheduler_TestSyncState(t *testing.T) { { RunID: leasedJob.LatestRun().Id(), JobID: leasedJob.LatestRun().JobId(), - JobSet: leasedJob.GetJobSet(), + JobSet: leasedJob.Jobset(), Succeeded: true, }, }, @@ -1488,7 +1487,7 @@ func (t *testSchedulingAlgo) Persist() { return } -func NewSchedulerResultForTest[S ~[]T, T interfaces.LegacySchedulerJob]( +func NewSchedulerResultForTest[S ~[]T, T *jobdb.Job]( preemptedJobs S, scheduledJobs S, failedJobs S, diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index c93e74f3b8d..9b6b27b5c50 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -25,7 +25,6 @@ import ( schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/fairness" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/quarantine" @@ -199,9 +198,9 @@ func (l *FairSchedulingAlgo) Schedule( l.schedulingContextRepository.StoreSchedulingContext(sctx) } - preemptedJobs := PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) - scheduledJobs := ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult) - failedJobs := FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) + preemptedJobs := PreemptedJobsFromSchedulerResult(schedulerResult) + scheduledJobs := ScheduledJobsFromSchedulerResult(schedulerResult) + failedJobs := FailedJobsFromSchedulerResult(schedulerResult) if err := txn.Upsert(preemptedJobs); err != nil { return nil, err } @@ -243,7 +242,7 @@ type JobQueueIteratorAdapter struct { it *immutable.SortedSetIterator[*jobdb.Job] } -func (it *JobQueueIteratorAdapter) Next() (interfaces.LegacySchedulerJob, error) { +func (it *JobQueueIteratorAdapter) Next() (*jobdb.Job, error) { if it.it.Done() { return nil, nil } @@ -476,7 +475,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( return nil, nil, err } for i, jctx := range result.PreemptedJobs { - jobDbJob := jctx.Job.(*jobdb.Job) + jobDbJob := jctx.Job if run := jobDbJob.LatestRun(); run != nil { jobDbJob = jobDbJob.WithUpdatedRun(run.WithFailed(true)) } else { @@ -485,8 +484,8 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( result.PreemptedJobs[i].Job = jobDbJob.WithQueued(false).WithFailed(true) } for i, jctx := range result.ScheduledJobs { - jobDbJob := jctx.Job.(*jobdb.Job) - jobId := jobDbJob.GetId() + jobDbJob := jctx.Job + jobId := jobDbJob.Id() nodeId := result.NodeIdByJobId[jobId] if nodeId == "" { return nil, nil, errors.Errorf("job %s not mapped to a node", jobId) @@ -505,7 +504,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), priority) } for i, jctx := range result.FailedJobs { - jobDbJob := jctx.Job.(*jobdb.Job) + jobDbJob := jctx.Job result.FailedJobs[i].Job = jobDbJob.WithQueued(false).WithFailed(true) } return result, sctx, nil @@ -537,8 +536,8 @@ func (repo *SchedulerJobRepositoryAdapter) GetQueueJobIds(queue string) []string // GetExistingJobsByIds is necessary to implement the JobRepository interface which we need while transitioning from the // old to new scheduler. -func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) []interfaces.LegacySchedulerJob { - rv := make([]interfaces.LegacySchedulerJob, 0, len(ids)) +func (repo *SchedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) []*jobdb.Job { + rv := make([]*jobdb.Job, 0, len(ids)) for _, id := range ids { if job := repo.txn.GetById(id); job != nil { rv = append(rv, job) @@ -667,7 +666,7 @@ func (l *FairSchedulingAlgo) aggregateAllocationByPoolAndQueueAndPriorityClass( allocation = make(schedulerobjects.QuantityByTAndResourceType[string]) allocationByQueue[queue] = allocation } - allocation.AddV1ResourceList(job.GetPriorityClassName(), job.GetResourceRequirements().Requests) + allocation.AddV1ResourceList(job.PriorityClassName(), job.ResourceRequirements().Requests) } } return rv diff --git a/internal/scheduler/scheduling_algo_test.go b/internal/scheduler/scheduling_algo_test.go index e08000884d7..f968607d601 100644 --- a/internal/scheduler/scheduling_algo_test.go +++ b/internal/scheduler/scheduling_algo_test.go @@ -441,7 +441,7 @@ func TestSchedule(t *testing.T) { require.NoError(t, err) // Check that the expected preemptions took place. - preemptedJobs := PreemptedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) + preemptedJobs := PreemptedJobsFromSchedulerResult(schedulerResult) actualPreemptedJobsByExecutorIndexAndNodeIndex := make(map[int]map[int][]int) for _, job := range preemptedJobs { executorIndex := executorIndexByJobId[job.Id()] @@ -466,7 +466,7 @@ func TestSchedule(t *testing.T) { } // Check that jobs were scheduled as expected. - scheduledJobs := ScheduledJobsFromSchedulerResult[*jobdb.Job](schedulerResult) + scheduledJobs := ScheduledJobsFromSchedulerResult(schedulerResult) actualScheduledIndices := make([]int, 0) for _, job := range scheduledJobs { actualScheduledIndices = append(actualScheduledIndices, queueIndexByJobId[job.Id()]) @@ -483,7 +483,7 @@ func TestSchedule(t *testing.T) { } // Check that we failed the correct number of excess jobs when a gang schedules >= minimum cardinality - failedJobs := FailedJobsFromSchedulerResult[*jobdb.Job](schedulerResult) + failedJobs := FailedJobsFromSchedulerResult(schedulerResult) assert.Equal(t, tc.expectedFailedJobCount, len(failedJobs)) // Check that preempted jobs are marked as such consistently. diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 39b468e05ce..f2d4463729e 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -28,7 +28,7 @@ import ( "github.com/armadaproject/armada/internal/scheduler/fairness" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" - schedulerobjects "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduleringester" "github.com/armadaproject/armada/pkg/armadaevents" ) @@ -254,7 +254,7 @@ func (s *Simulator) setupClusters() error { ), } txn := nodeDb.Txn(true) - if err := nodeDb.CreateAndInsertWithApiJobsWithTxn(txn, nil, node); err != nil { + if err := nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { txn.Abort() return err } @@ -499,9 +499,9 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { // Update jobDb to reflect the decisions by the scheduler. // Sort jobs to ensure deterministic event ordering. - preemptedJobs := scheduler.PreemptedJobsFromSchedulerResult[*jobdb.Job](result) + preemptedJobs := scheduler.PreemptedJobsFromSchedulerResult(result) scheduledJobs := slices.Clone(result.ScheduledJobs) - failedJobs := scheduler.FailedJobsFromSchedulerResult[*jobdb.Job](result) + failedJobs := scheduler.FailedJobsFromSchedulerResult(result) lessJob := func(a, b *jobdb.Job) int { if a.Queue() < b.Queue() { return -1 @@ -517,7 +517,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { } slices.SortFunc(preemptedJobs, lessJob) slices.SortFunc(scheduledJobs, func(a, b *schedulercontext.JobSchedulingContext) int { - return lessJob(a.Job.(*jobdb.Job), b.Job.(*jobdb.Job)) + return lessJob(a.Job, b.Job) }) slices.SortFunc(failedJobs, lessJob) for i, job := range preemptedJobs { @@ -529,10 +529,10 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { preemptedJobs[i] = job.WithQueued(false).WithFailed(true) } for i, jctx := range scheduledJobs { - job := jctx.Job.(*jobdb.Job) - nodeId := result.NodeIdByJobId[job.GetId()] + job := jctx.Job + nodeId := result.NodeIdByJobId[job.Id()] if nodeId == "" { - return errors.Errorf("job %s not mapped to a node", job.GetId()) + return errors.Errorf("job %s not mapped to a node", job.Id()) } if node, err := nodeDb.GetNode(nodeId); err != nil { return err @@ -553,7 +553,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { if err := txn.Upsert(preemptedJobs); err != nil { return err } - if err := txn.Upsert(util.Map(scheduledJobs, func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job { return jctx.Job.(*jobdb.Job) })); err != nil { + if err := txn.Upsert(util.Map(scheduledJobs, func(jctx *schedulercontext.JobSchedulingContext) *jobdb.Job { return jctx.Job })); err != nil { return err } if err := txn.Upsert(failedJobs); err != nil { @@ -747,8 +747,8 @@ func (s *Simulator) handleJobSucceeded(txn *jobdb.Txn, e *armadaevents.JobSuccee run := job.LatestRun() pool := s.poolByNodeId[run.NodeId()] s.allocationByPoolAndQueueAndPriorityClass[pool][job.Queue()].SubV1ResourceList( - job.GetPriorityClassName(), - job.GetResourceRequirements().Requests, + job.PriorityClassName(), + job.ResourceRequirements().Requests, ) // Unbind the job from the node on which it was scheduled. @@ -759,7 +759,7 @@ func (s *Simulator) handleJobSucceeded(txn *jobdb.Txn, e *armadaevents.JobSuccee // Increase the successful job count for this jobTemplate. // If all jobs created from this template have succeeded, update dependent templates // and submit any templates for which this was the last dependency. - jobTemplate := s.jobTemplateByJobId[job.GetId()] + jobTemplate := s.jobTemplateByJobId[job.Id()] jobTemplate.NumberSuccessful++ if jobTemplate.Number == jobTemplate.NumberSuccessful { delete(s.activeJobTemplatesById, jobTemplate.Id) @@ -832,7 +832,7 @@ func (s *Simulator) handleJobRunPreempted(txn *jobdb.Txn, e *armadaevents.JobRun job := txn.GetById(jobId) // Submit a retry for this job. - jobTemplate := s.jobTemplateByJobId[job.GetId()] + jobTemplate := s.jobTemplateByJobId[job.Id()] retryJobId := util.ULID() resubmitTime := s.time.Add(s.generateRandomShiftedExponentialDuration(s.ClusterSpec.WorkflowManagerDelayDistribution)) s.pushEventSequence( diff --git a/internal/scheduler/simulator/writer.go b/internal/scheduler/simulator/writer.go index 95ad7a5f425..fb436cf9057 100644 --- a/internal/scheduler/simulator/writer.go +++ b/internal/scheduler/simulator/writer.go @@ -89,12 +89,12 @@ func (w *Writer) flattenStateTransition(flattenedStateTransitions []*FlattenedAr for i, event := range events.Events { // Assumes all supported events have an associated job associatedJob := jobsList[i] - prevSeenEvent := w.prevSeenEventByJobId[associatedJob.GetId()] + prevSeenEvent := w.prevSeenEventByJobId[associatedJob.Id()] // Resource requirements - cpuLimit := associatedJob.GetResourceRequirements().Requests[v1.ResourceCPU] - memoryLimit := associatedJob.GetResourceRequirements().Requests[v1.ResourceMemory] - ephemeralStorageLimit := associatedJob.GetResourceRequirements().Requests[v1.ResourceEphemeralStorage] - gpuLimit := associatedJob.GetResourceRequirements().Requests["nvidia.com/gpu"] + cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU] + memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory] + ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage] + gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"] prevEventType := 0 prevEventTime := *event.Created @@ -107,10 +107,10 @@ func (w *Writer) flattenStateTransition(flattenedStateTransitions []*FlattenedAr Time: event.Created.Sub(startTime).Milliseconds(), Queue: events.Queue, JobSet: events.JobSetName, - JobId: associatedJob.GetId(), + JobId: associatedJob.Id(), RunIndex: len(associatedJob.AllRuns()) - 1, // Assumed to be related to latest run in simulation NumRuns: len(associatedJob.AllRuns()), - PriorityClass: associatedJob.GetPriorityClassName(), + PriorityClass: associatedJob.PriorityClassName(), PreviousEventType: prevEventType, EventType: w.encodeEvent(event), SecondsSinceLastEvent: event.Created.Sub(prevEventTime).Seconds(), @@ -120,10 +120,10 @@ func (w *Writer) flattenStateTransition(flattenedStateTransitions []*FlattenedAr EphemeralStorage: ephemeralStorageLimit.AsApproximateFloat64(), ExitCode: 0, }) - w.prevSeenEventByJobId[associatedJob.GetId()] = event + w.prevSeenEventByJobId[associatedJob.Id()] = event if associatedJob.Succeeded() || associatedJob.Failed() || associatedJob.Cancelled() { - delete(w.prevSeenEventByJobId, associatedJob.GetId()) + delete(w.prevSeenEventByJobId, associatedJob.Id()) } } diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index 438c53a445e..b5bc3bee331 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -20,7 +20,6 @@ import ( "github.com/armadaproject/armada/internal/scheduler/adapters" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -197,10 +196,10 @@ func (srv *SubmitChecker) check(jctxs []*schedulercontext.JobSchedulingContext) } func (srv *SubmitChecker) getIndividualSchedulingResult(jctx *schedulercontext.JobSchedulingContext) schedulingResult { - schedulingKey, ok := jctx.Job.GetSchedulingKey() + schedulingKey, ok := jctx.Job.SchedulingKey() if !ok { srv.mu.Lock() - schedulingKey = interfaces.SchedulingKeyFromLegacySchedulerJob(srv.schedulingKeyGenerator, jctx.Job) + schedulingKey = jobdb.SchedulingKeyFromJob(srv.schedulingKeyGenerator, jctx.Job) srv.mu.Unlock() } diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 8dd3efc1a7c..e653bbc845b 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -46,6 +46,8 @@ var ( PriorityClass2: {Priority: 2, Preemptible: true}, PriorityClass2NonPreemptible: {Priority: 2, Preemptible: false}, PriorityClass3: {Priority: 3, Preemptible: false}, + "armada-preemptible-away": {Priority: 30000, Preemptible: true, AwayNodeTypes: []types.AwayNodeType{{Priority: 29000, WellKnownNodeTypeName: "gpu"}}}, + "armada-preemptible": {Priority: 30000, Preemptible: true}, } TestDefaultPriorityClass = PriorityClass3 TestPriorities = []int32{0, 1, 2, 3} diff --git a/pkg/api/util.go b/pkg/api/util.go index a87329d8a98..f1f44d0ee35 100644 --- a/pkg/api/util.go +++ b/pkg/api/util.go @@ -14,7 +14,6 @@ import ( "github.com/armadaproject/armada/internal/common/logging" armadaresource "github.com/armadaproject/armada/internal/common/resource" "github.com/armadaproject/armada/internal/common/types" - "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -207,46 +206,6 @@ func (job *Job) GetSchedulingKey() (schedulerobjects.SchedulingKey, bool) { return schedulerobjects.SchedulingKey{}, false } -// SchedulingOrderCompare defines the order in which jobs in a particular queue should be scheduled, -func (job *Job) SchedulingOrderCompare(other interfaces.LegacySchedulerJob) int { - // We need this cast for now to expose this method via an interface. - // This is safe since we only ever compare jobs of the same type. - return SchedulingOrderCompare(job, other.(*Job)) -} - -// SchedulingOrderCompare defines the order in which jobs in a queue should be scheduled -// (both when scheduling new jobs and when re-scheduling evicted jobs). -// Specifically, compare returns -// - 0 if the jobs have equal job id, -// - -1 if job should be scheduled before other, -// - +1 if other should be scheduled before other. -func SchedulingOrderCompare(job, other *Job) int { - if job.Id == other.Id { - return 0 - } - - // Jobs with higher in queue-priority come first. - if job.Priority < other.Priority { - return -1 - } else if job.Priority > other.Priority { - return 1 - } - - // Jobs that have been queuing for longer are scheduled first. - if cmp := job.Created.Compare(other.Created); cmp != 0 { - return cmp - } - - // Tie-break by jobId, which must be unique. - // This ensure there is a total order between jobs, i.e., no jobs are equal from an ordering point of view. - if job.Id < other.Id { - return -1 - } else if job.Id > other.Id { - return 1 - } - panic("We should never get here. Since we check for job id equality at the top of this function.") -} - func (job *Job) GetJobSet() string { return job.JobSetId }