From d9bd775914c52de953fae392f428aa03c6407c32 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Fri, 20 Sep 2024 16:49:21 +0100 Subject: [PATCH 1/2] Remove the config to disable the scheduling key optimisation Signed-off-by: Chris Martin --- config/scheduler/config.yaml | 1 - .../scheduler/configuration/configuration.go | 2 -- internal/scheduler/gang_scheduler.go | 14 ++++++-------- internal/scheduler/gang_scheduler_test.go | 2 +- internal/scheduler/preempting_queue_scheduler.go | 16 ++++------------ internal/scheduler/queue_scheduler.go | 7 ++----- internal/scheduler/queue_scheduler_test.go | 2 +- internal/scheduler/scheduling_algo.go | 3 --- 8 files changed, 14 insertions(+), 33 deletions(-) diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 39332be36a4..81d95f21158 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -111,6 +111,5 @@ scheduling: resolution: "1Gi" executorTimeout: "10m" maxUnacknowledgedJobsPerExecutor: 2500 - alwaysAttemptScheduling: false executorUpdateFrequency: "60s" diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index 72f6f9648ec..e0941eedb6f 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -227,8 +227,6 @@ type SchedulingConfig struct { // Maximum number of jobs that can be assigned to a executor but not yet acknowledged, before // the scheduler is excluded from consideration by the scheduler. MaxUnacknowledgedJobsPerExecutor uint - // If true, do not during scheduling skip jobs with requirements known to be impossible to meet. - AlwaysAttemptScheduling bool // The frequency at which the scheduler updates the cluster state. ExecutorUpdateFrequency time.Duration // Defines the order in which pools will be scheduled. Higher priority pools will be scheduled first diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index 6f83cab052b..a87afce0ed8 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -31,19 +31,17 @@ func NewGangScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, + skipUnsuccessfulSchedulingKeyCheck bool, ) (*GangScheduler, error) { return &GangScheduler{ - constraints: constraints, - floatingResourceTypes: floatingResourceTypes, - schedulingContext: sctx, - nodeDb: nodeDb, + constraints: constraints, + floatingResourceTypes: floatingResourceTypes, + schedulingContext: sctx, + nodeDb: nodeDb, + skipUnsuccessfulSchedulingKeyCheck: skipUnsuccessfulSchedulingKeyCheck, }, nil } -func (sch *GangScheduler) SkipUnsuccessfulSchedulingKeyCheck() { - sch.skipUnsuccessfulSchedulingKeyCheck = true -} - func (sch *GangScheduler) updateGangSchedulingContextOnSuccess(gctx *schedulercontext.GangSchedulingContext, gangAddedToSchedulingContext bool) error { if !gangAddedToSchedulingContext { // Nothing to do. diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index b78beae55b6..de64d262c66 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -643,7 +643,7 @@ func TestGangScheduler(t *testing.T) { constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil, map[string]bool{}) floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(tc.SchedulingConfig.ExperimentalFloatingResources) require.NoError(t, err) - sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb) + sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, false) require.NoError(t, err) var actualScheduledIndices []int diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 8b4fed9225e..afb2a64c751 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -42,8 +42,6 @@ type PreemptingQueueScheduler struct { jobIdsByGangId map[string]map[string]bool // Maps job ids of gang jobs to the id of that gang. gangIdByJobId map[string]string - // If true, the unsuccessfulSchedulingKeys check of gangScheduler is omitted. - skipUnsuccessfulSchedulingKeyCheck bool // If true, asserts that the nodeDb state is consistent with expected changes. enableAssertions bool } @@ -89,10 +87,6 @@ func (sch *PreemptingQueueScheduler) EnableAssertions() { sch.enableAssertions = true } -func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() { - sch.skipUnsuccessfulSchedulingKeyCheck = true -} - // Schedule // - preempts jobs belonging to queues with total allocation above their fair share and // - schedules new jobs belonging to queues with total allocation less than their fair share. @@ -155,6 +149,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche armadacontext.WithLogField(ctx, "stage", "re-schedule after balancing eviction"), inMemoryJobRepo, sch.jobRepo, + false, ) if err != nil { return nil, err @@ -201,14 +196,13 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*sche // Re-schedule evicted jobs/schedule new jobs. // Only necessary if a non-zero number of jobs were evicted. if len(evictorResult.EvictedJctxsByJobId) > 0 { - // Since no new jobs are considered in this round, the scheduling key check brings no benefit. - sch.SkipUnsuccessfulSchedulingKeyCheck() ctx.WithField("stage", "scheduling-algo").Info("Performing second scheduling ") schedulerResult, err = sch.schedule( armadacontext.WithLogField(ctx, "stage", "schedule after oversubscribed eviction"), inMemoryJobRepo, // Only evicted jobs should be scheduled in this round. nil, + true, // Since no new jobs are considered in this round, the scheduling key check brings no benefit. ) if err != nil { return nil, err @@ -524,7 +518,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch return nil } -func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository) (*schedulerresult.SchedulerResult, error) { +func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository, skipUnsuccessfulSchedulingKeyCheck bool) (*schedulerresult.SchedulerResult, error) { jobIteratorByQueue := make(map[string]JobIterator) for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { evictedIt := inMemoryJobRepo.GetJobIterator(qctx.Queue) @@ -545,13 +539,11 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo sch.floatingResourceTypes, sch.nodeDb, jobIteratorByQueue, + skipUnsuccessfulSchedulingKeyCheck, ) if err != nil { return nil, err } - if sch.skipUnsuccessfulSchedulingKeyCheck { - sched.SkipUnsuccessfulSchedulingKeyCheck() - } result, err := sched.Schedule(ctx) if err != nil { return nil, err diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index b2e9d2f8916..d347d9a55b1 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -33,13 +33,14 @@ func NewQueueScheduler( floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, jobIteratorByQueue map[string]JobIterator, + skipUnsuccessfulSchedulingKeyCheck bool, ) (*QueueScheduler, error) { for queue := range jobIteratorByQueue { if _, ok := sctx.QueueSchedulingContexts[queue]; !ok { return nil, errors.Errorf("no scheduling context for queue %s", queue) } } - gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb) + gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb, skipUnsuccessfulSchedulingKeyCheck) if err != nil { return nil, err } @@ -58,10 +59,6 @@ func NewQueueScheduler( }, nil } -func (sch *QueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() { - sch.gangScheduler.SkipUnsuccessfulSchedulingKeyCheck() -} - func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*schedulerresult.SchedulerResult, error) { var scheduledJobs []*schedulercontext.JobSchedulingContext diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 050fd2a97ee..80373e3b2f9 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -540,7 +540,7 @@ func TestQueueScheduler(t *testing.T) { it := jobRepo.GetJobIterator(q.Name) jobIteratorByQueue[q.Name] = it } - sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue) + sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue, false) require.NoError(t, err) result, err := sch.Schedule(armadacontext.Background()) diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index e3b2f9b11af..2ef9cacbea5 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -484,9 +484,6 @@ func (l *FairSchedulingAlgo) schedulePool( fsctx.jobIdsByGangId, fsctx.gangIdByJobId, ) - if l.schedulingConfig.AlwaysAttemptScheduling { - scheduler.SkipUnsuccessfulSchedulingKeyCheck() - } if l.schedulingConfig.EnableAssertions { scheduler.EnableAssertions() } From 8b75338bd4da31ef3e290fc597ccd3d227da6ce9 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Mon, 23 Sep 2024 13:30:21 +0100 Subject: [PATCH 2/2] fix mere conflicts Signed-off-by: Chris Martin --- internal/scheduler/preempting_queue_scheduler.go | 2 +- internal/scheduler/queue_scheduler.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index d2d19d1af00..42ce6731ea5 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -496,7 +496,7 @@ func addEvictedJobsToNodeDb(_ *armadacontext.Context, sctx *schedulercontext.Sch } func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemoryJobRepo *InMemoryJobRepository, jobRepo JobRepository, skipUnsuccessfulSchedulingKeyCheck bool) (*schedulerresult.SchedulerResult, error) { - jobIteratorByQueue := make(map[string]JobIterator) + jobIteratorByQueue := make(map[string]JobContextIterator) for _, qctx := range sch.schedulingContext.QueueSchedulingContexts { evictedIt := inMemoryJobRepo.GetJobIterator(qctx.Queue) if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() { diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index 3ad3d6155d0..6be09847e36 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -33,6 +33,7 @@ func NewQueueScheduler( floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, jobIteratorByQueue map[string]JobContextIterator, + skipUnsuccessfulSchedulingKeyCheck bool, ) (*QueueScheduler, error) { for queue := range jobIteratorByQueue { if _, ok := sctx.QueueSchedulingContexts[queue]; !ok {