Skip to content

Commit

Permalink
Remove Eviction Probability Config Values (#3651)
Browse files Browse the repository at this point in the history
* remove stochastic properties

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* remove commented out code

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* remove commented out code

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

---------

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 and d80tb7 committed Jun 6, 2024
1 parent 08c4a70 commit 5602ad0
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 132 deletions.
2 changes: 0 additions & 2 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ scheduling:
resolution: "1"
disableScheduling: false
enableAssertions: false
nodeEvictionProbability: 1.0
nodeOversubscriptionEvictionProbability: 1.0
protectedFractionOfFairShare: 1.0
nodeIdLabel: "kubernetes.io/hostname"
priorityClasses:
Expand Down
9 changes: 0 additions & 9 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,6 @@ type SchedulingConfig struct {
DisableScheduling bool
// Set to true to enable scheduler assertions. This results in some performance loss.
EnableAssertions bool
// If using PreemptToFairShare,
// the probability of evicting jobs on a node to balance resource usage.
// TODO(albin): Remove.
NodeEvictionProbability float64
// If using PreemptToFairShare,
// the probability of evicting jobs on oversubscribed nodes, i.e.,
// nodes on which the total resource requests are greater than the available resources.
// TODO(albin): Remove.
NodeOversubscriptionEvictionProbability float64
// Only queues allocated more than this fraction of their fair share are considered for preemption.
ProtectedFractionOfFairShare float64 `validate:"gte=0"`
// Armada adds a node selector term to every scheduled pod using this label with the node name as value.
Expand Down
58 changes: 15 additions & 43 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package scheduler

import (
"math/rand"
"reflect"
"time"

Expand All @@ -27,13 +26,11 @@ import (
// PreemptingQueueScheduler is a scheduler that makes a unified decisions on which jobs to preempt and schedule.
// Uses QueueScheduler as a building block.
type PreemptingQueueScheduler struct {
schedulingContext *schedulercontext.SchedulingContext
constraints schedulerconstraints.SchedulingConstraints
nodeEvictionProbability float64
nodeOversubscriptionEvictionProbability float64
protectedFractionOfFairShare float64
jobRepo JobRepository
nodeDb *nodedb.NodeDb
schedulingContext *schedulercontext.SchedulingContext
constraints schedulerconstraints.SchedulingConstraints
protectedFractionOfFairShare float64
jobRepo JobRepository
nodeDb *nodedb.NodeDb
// Maps job ids to the id of the node the job is associated with.
// For scheduled or running jobs, that is the node the job is assigned to.
// For preempted jobs, that is the node the job was preempted from.
Expand All @@ -51,8 +48,6 @@ type PreemptingQueueScheduler struct {
func NewPreemptingQueueScheduler(
sctx *schedulercontext.SchedulingContext,
constraints schedulerconstraints.SchedulingConstraints,
nodeEvictionProbability float64,
nodeOversubscriptionEvictionProbability float64,
protectedFractionOfFairShare float64,
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
Expand All @@ -74,16 +69,14 @@ func NewPreemptingQueueScheduler(
initialJobIdsByGangId[gangId] = maps.Clone(jobIds)
}
return &PreemptingQueueScheduler{
schedulingContext: sctx,
constraints: constraints,
nodeEvictionProbability: nodeEvictionProbability,
nodeOversubscriptionEvictionProbability: nodeOversubscriptionEvictionProbability,
protectedFractionOfFairShare: protectedFractionOfFairShare,
jobRepo: jobRepo,
nodeDb: nodeDb,
nodeIdByJobId: maps.Clone(initialNodeIdByJobId),
jobIdsByGangId: initialJobIdsByGangId,
gangIdByJobId: maps.Clone(initialGangIdByJobId),
schedulingContext: sctx,
constraints: constraints,
protectedFractionOfFairShare: protectedFractionOfFairShare,
jobRepo: jobRepo,
nodeDb: nodeDb,
nodeIdByJobId: maps.Clone(initialNodeIdByJobId),
jobIdsByGangId: initialJobIdsByGangId,
gangIdByJobId: maps.Clone(initialGangIdByJobId),
}
}

Expand Down Expand Up @@ -120,7 +113,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
sch.jobRepo,
sch.nodeDb,
sch.schedulingContext.PriorityClasses,
sch.nodeEvictionProbability,
func(ctx *armadacontext.Context, job *jobdb.Job) bool {
priorityClass := job.PriorityClass()
if !priorityClass.Preemptible {
Expand All @@ -144,7 +136,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
}
return true
},
nil,
),
)
if err != nil {
Expand Down Expand Up @@ -185,8 +176,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
sch.jobRepo,
sch.nodeDb,
sch.schedulingContext.PriorityClasses,
sch.nodeOversubscriptionEvictionProbability,
nil,
),
)
if err != nil {
Expand Down Expand Up @@ -709,22 +698,14 @@ func NewNodeEvictor(
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
priorityClasses map[string]types.PriorityClass,
perNodeEvictionProbability float64,
jobFilter func(*armadacontext.Context, *jobdb.Job) bool,
random *rand.Rand,
) *Evictor {
if perNodeEvictionProbability <= 0 {
return nil
}
if random == nil {
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
}
return &Evictor{
jobRepo: jobRepo,
nodeDb: nodeDb,
priorityClasses: priorityClasses,
nodeFilter: func(_ *armadacontext.Context, node *internaltypes.Node) bool {
return len(node.AllocatedByJobId) > 0 && random.Float64() < perNodeEvictionProbability
return len(node.AllocatedByJobId) > 0
},
jobFilter: jobFilter,
}
Expand Down Expand Up @@ -759,20 +740,11 @@ func NewFilteredEvictor(

// NewOversubscribedEvictor returns a new evictor that
// for each node evicts all preemptible jobs of a priority class for which at least one job could not be scheduled
// with probability perNodeEvictionProbability.
func NewOversubscribedEvictor(
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
priorityClasses map[string]types.PriorityClass,
perNodeEvictionProbability float64,
random *rand.Rand,
) *Evictor {
if perNodeEvictionProbability <= 0 {
return nil
}
if random == nil {
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
}
// Populating overSubscribedPriorities relies on
// - nodeFilter being called once before all calls to jobFilter and
// - jobFilter being called for all jobs on that node before moving on to another node.
Expand All @@ -792,7 +764,7 @@ func NewOversubscribedEvictor(
overSubscribedPriorities[p] = true
}
}
return len(overSubscribedPriorities) > 0 && random.Float64() < perNodeEvictionProbability
return len(overSubscribedPriorities) > 0
},
jobFilter: func(ctx *armadacontext.Context, job *jobdb.Job) bool {
priorityClass := job.PriorityClass()
Expand Down
67 changes: 6 additions & 61 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ func TestEvictOversubscribed(t *testing.T) {
evictor := NewOversubscribedEvictor(
NewSchedulerJobRepositoryAdapter(jobDbTxn),
nodeDb,
config.PriorityClasses,
1,
nil,
)
config.PriorityClasses)
result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn)
require.NoError(t, err)

Expand Down Expand Up @@ -572,53 +569,10 @@ func TestPreemptingQueueScheduler(t *testing.T) {
"C": 1,
},
},
"gang preemption with NodeEvictionProbability 0": {
SchedulingConfig: testfixtures.WithNodeEvictionProbabilityConfig(
0.0, // To test the gang evictor, we need to disable stochastic eviction.
testfixtures.TestSchedulingConfig(),
),
Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
// Schedule a gang filling all of node 1 and part of node 2.
// Make the jobs of node 1 priority 1,
// to avoid them being urgency-preempted in the next round.
JobsByQueue: map[string][]*jobdb.Job{
"A": testfixtures.WithGangAnnotationsJobs(
append(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass1, 32), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1)...),
),
},
ExpectedScheduledIndices: map[string][]int{
"A": testfixtures.IntRange(0, 32),
},
},
{
// Schedule a that requires preempting one job in the gang,
// and assert that all jobs in the gang are preempted.
JobsByQueue: map[string][]*jobdb.Job{
"B": testfixtures.N32Cpu256GiJobs("B", testfixtures.PriorityClass1, 1),
},
ExpectedScheduledIndices: map[string][]int{
"B": testfixtures.IntRange(0, 0),
},
ExpectedPreemptedIndices: map[string]map[int][]int{
"A": {
0: testfixtures.IntRange(0, 32),
},
},
},
},
PriorityFactorByQueue: map[string]float64{
"A": 1,
"B": 1,
},
},

"gang preemption avoid cascading preemption": {
SchedulingConfig: testfixtures.WithNodeEvictionProbabilityConfig(
0.0, // To test the gang evictor, we need to disable stochastic eviction.
testfixtures.TestSchedulingConfig(),
),
Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities),
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(3, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
// Schedule a gang spanning nodes 1 and 2.
Expand Down Expand Up @@ -1139,11 +1093,8 @@ func TestPreemptingQueueScheduler(t *testing.T) {
},
},
"Oversubscribed eviction does not evict non-preemptible": {
SchedulingConfig: testfixtures.WithNodeEvictionProbabilityConfig(
0.0,
testfixtures.TestSchedulingConfig(),
),
Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
JobsByQueue: map[string][]*jobdb.Job{
Expand Down Expand Up @@ -1837,8 +1788,6 @@ func TestPreemptingQueueScheduler(t *testing.T) {
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
tc.SchedulingConfig.NodeEvictionProbability,
tc.SchedulingConfig.NodeOversubscriptionEvictionProbability,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
nodeDb,
Expand Down Expand Up @@ -2194,8 +2143,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
tc.SchedulingConfig.NodeEvictionProbability,
tc.SchedulingConfig.NodeOversubscriptionEvictionProbability,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
nodeDb,
Expand Down Expand Up @@ -2256,8 +2203,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) {
sch := NewPreemptingQueueScheduler(
sctx,
constraints,
tc.SchedulingConfig.NodeEvictionProbability,
tc.SchedulingConfig.NodeOversubscriptionEvictionProbability,
tc.SchedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(jobDbTxn),
nodeDb,
Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,6 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors(
scheduler := NewPreemptingQueueScheduler(
sctx,
constraints,
l.schedulingConfig.NodeEvictionProbability,
l.schedulingConfig.NodeOversubscriptionEvictionProbability,
l.schedulingConfig.ProtectedFractionOfFairShare,
NewSchedulerJobRepositoryAdapter(fsctx.txn),
nodeDb,
Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,6 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error {
sch := scheduler.NewPreemptingQueueScheduler(
sctx,
constraints,
s.schedulingConfig.NodeEvictionProbability,
s.schedulingConfig.NodeOversubscriptionEvictionProbability,
s.schedulingConfig.ProtectedFractionOfFairShare,
scheduler.NewSchedulerJobRepositoryAdapter(txn),
nodeDb,
Expand Down
1 change: 0 additions & 1 deletion internal/scheduler/simulator/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func GetOneQueue10JobWorkload() *WorkloadSpec {

func GetBasicSchedulingConfig() configuration.SchedulingConfig {
return configuration.SchedulingConfig{
NodeEvictionProbability: 1.0,
PriorityClasses: map[string]types.PriorityClass{
"armada-default": {
Priority: 30000,
Expand Down
12 changes: 0 additions & 12 deletions internal/scheduler/testfixtures/testfixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ func TestSchedulingConfig() schedulerconfiguration.SchedulingConfig {
return schedulerconfiguration.SchedulingConfig{
PriorityClasses: maps.Clone(TestPriorityClasses),
DefaultPriorityClassName: TestDefaultPriorityClass,
NodeEvictionProbability: 1.0,
NodeOversubscriptionEvictionProbability: 1.0,
MaximumSchedulingRate: math.Inf(1),
MaximumSchedulingBurst: math.MaxInt,
MaximumPerQueueSchedulingRate: math.Inf(1),
Expand All @@ -188,16 +186,6 @@ func WithProtectedFractionOfFairShareConfig(v float64, config schedulerconfigura
return config
}

func WithNodeEvictionProbabilityConfig(p float64, config schedulerconfiguration.SchedulingConfig) schedulerconfiguration.SchedulingConfig {
config.NodeEvictionProbability = p
return config
}

func WithNodeOversubscriptionEvictionProbabilityConfig(p float64, config schedulerconfiguration.SchedulingConfig) schedulerconfiguration.SchedulingConfig {
config.NodeOversubscriptionEvictionProbability = p
return config
}

func WithRoundLimitsConfig(limits map[string]float64, config schedulerconfiguration.SchedulingConfig) schedulerconfiguration.SchedulingConfig {
config.MaximumResourceFractionToSchedule = limits
return config
Expand Down

0 comments on commit 5602ad0

Please sign in to comment.