From 7f2085db51e3b2e5681378dfd82772e736b351d9 Mon Sep 17 00:00:00 2001 From: robertdavidsmith <34475852+robertdavidsmith@users.noreply.github.com> Date: Thu, 2 May 2024 14:49:18 +0100 Subject: [PATCH] Scheduler: efficient resource implementation (#3561) --- config/armada/config.yaml | 9 ++ config/scheduler/config.yaml | 9 ++ go.mod | 2 +- internal/armada/configuration/types.go | 20 ++-- internal/armada/server.go | 8 ++ internal/armada/submit/submit.go | 1 - .../executor/job/processors/preempt_runs.go | 1 - internal/pulsartest/app.go | 2 - internal/scheduler/gang_scheduler_test.go | 7 +- .../scheduler/internaltypes/quantity_util.go | 18 ++++ .../internaltypes/quantity_util_test.go | 69 ++++++++++++++ .../scheduler/internaltypes/resource_list.go | 16 ++++ .../internaltypes/resource_list_factory.go | 93 +++++++++++++++++++ .../resource_list_factory_test.go | 85 +++++++++++++++++ internal/scheduler/nodedb/nodedb.go | 10 +- internal/scheduler/nodedb/nodedb_test.go | 4 + internal/scheduler/pool_assigner.go | 7 +- internal/scheduler/pool_assigner_test.go | 2 +- internal/scheduler/queue_scheduler_test.go | 1 + internal/scheduler/schedulerapp.go | 14 ++- internal/scheduler/scheduling_algo.go | 11 ++- internal/scheduler/scheduling_algo_test.go | 3 + internal/scheduler/simulator/simulator.go | 13 ++- internal/scheduler/simulator/test_utils.go | 2 +- internal/scheduler/submitcheck.go | 7 +- internal/scheduler/submitcheck_test.go | 4 +- .../scheduler/testfixtures/testfixtures.go | 26 +++++- 27 files changed, 408 insertions(+), 36 deletions(-) create mode 100644 internal/scheduler/internaltypes/quantity_util.go create mode 100644 internal/scheduler/internaltypes/quantity_util_test.go create mode 100644 internal/scheduler/internaltypes/resource_list.go create mode 100644 internal/scheduler/internaltypes/resource_list_factory.go create mode 100644 internal/scheduler/internaltypes/resource_list_factory_test.go diff --git a/config/armada/config.yaml b/config/armada/config.yaml index 83ee58d35d9..a2681d6c04e 100644 --- a/config/armada/config.yaml +++ b/config/armada/config.yaml @@ -39,6 +39,15 @@ eventsApiRedis: # This config must be consistent with the scheduling config used by the scheduler. # You may want to insert the scheduling config used for the scheduler automatically, e.g., using PyYAML, to guarantee consistency. scheduling: + supportedResourceTypes: + - name: memory + resolution: "1" + - name: cpu + resolution: "1m" + - name: ephemeral-storage + resolution: "1" + - name: nvidia.com/gpu + resolution: "1" executorTimeout: "60m" executorUpdateFrequency: "1m" priorityClasses: diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 5c3fe7093d5..797833939f0 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -79,6 +79,15 @@ grpc: enabled: false # You may want to configure indexedNodeLabels and indexedTaints to speed up scheduling. scheduling: + supportedResourceTypes: + - name: memory + resolution: "1" + - name: cpu + resolution: "1m" + - name: ephemeral-storage + resolution: "1" + - name: nvidia.com/gpu + resolution: "1" disableScheduling: false enableAssertions: false nodeEvictionProbability: 1.0 diff --git a/go.mod b/go.mod index e279c8fdd88..c7775eaf00a 100644 --- a/go.mod +++ b/go.mod @@ -92,6 +92,7 @@ require ( golang.org/x/time v0.3.0 gonum.org/v1/gonum v0.14.0 google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 + gopkg.in/inf.v0 v0.9.1 ) require ( @@ -204,7 +205,6 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect google.golang.org/protobuf v1.31.0 // indirect - gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index a9b47d394ab..6dce794f28d 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -224,10 +224,19 @@ type SchedulingConfig struct { // Hence, a larger MaxExtraNodesToConsider would reduce the expected number of preemptions. // TODO(albin): Remove. It's unused. MaxExtraNodesToConsider uint + // Resource types (e.g. memory or nvidia.com/gpu) that the scheduler keeps track of. + // Resource types not on this list will be ignored if seen on a node, and any jobs requesting them will fail. + SupportedResourceTypes []ResourceType // Resources, e.g., "cpu", "memory", and "nvidia.com/gpu", for which the scheduler creates indexes for efficient lookup. // This list must contain at least one resource. Adding more than one resource is not required, but may speed up scheduling. // Ideally, this list contains all resources that frequently constrain which nodes a job can be scheduled onto. - IndexedResources []IndexedResource + // + // In particular, the allocatable resources on each node are rounded to a multiple of the resolution. + // Lower resolution speeds up scheduling by improving node lookup speed but may prevent scheduling jobs, + // since the allocatable resources may be rounded down to be a multiple of the resolution. + // + // See NodeDb docs for more details. + IndexedResources []ResourceType // Node labels that the scheduler creates indexes for efficient lookup of. // Should include node labels frequently used by node selectors on submitted jobs. // @@ -295,16 +304,11 @@ func SchedulingConfigValidation(sl validator.StructLevel) { } } -// IndexedResource represents a resource the scheduler indexes for efficient lookup. -type IndexedResource struct { +// ResourceType represents a resource the scheduler indexes for efficient lookup. +type ResourceType struct { // Resource name, e.g., "cpu", "memory", or "nvidia.com/gpu". Name string // Resolution with which Armada tracks this resource; larger values indicate lower resolution. - // In particular, the allocatable resources on each node are rounded to a multiple of the resolution. - // Lower resolution speeds up scheduling by improving node lookup speed but may prevent scheduling jobs, - // since the allocatable resources may be rounded down to be a multiple of the resolution. - // - // See NodeDb docs for more details. Resolution resource.Quantity } diff --git a/internal/armada/server.go b/internal/armada/server.go index 5b985ace215..b72ac123581 100644 --- a/internal/armada/server.go +++ b/internal/armada/server.go @@ -32,6 +32,7 @@ import ( "github.com/armadaproject/armada/internal/common/pulsarutils" "github.com/armadaproject/armada/internal/scheduler" schedulerdb "github.com/armadaproject/armada/internal/scheduler/database" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/reports" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/pkg/api" @@ -144,10 +145,17 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt // Executor Repositories for pulsar scheduler pulsarExecutorRepo := schedulerdb.NewRedisExecutorRepository(db, "pulsar") + + resourceListFactory, err := internaltypes.MakeResourceListFactory(config.Scheduling.SupportedResourceTypes) + if err != nil { + return errors.WithMessage(err, "Error with the .scheduling.supportedResourceTypes field in config") + } + ctx.Infof("Supported resource types: %s", resourceListFactory.SummaryString()) submitChecker := scheduler.NewSubmitChecker( 30*time.Minute, config.Scheduling, pulsarExecutorRepo, + resourceListFactory, ) services = append(services, func() error { return submitChecker.Run(ctx) diff --git a/internal/armada/submit/submit.go b/internal/armada/submit/submit.go index 91d74a8e696..3d30b7a5662 100644 --- a/internal/armada/submit/submit.go +++ b/internal/armada/submit/submit.go @@ -454,7 +454,6 @@ func (s *Server) ReprioritizeJobs(grpcCtx context.Context, req *api.JobRepriorit } err = s.publisher.PublishMessages(ctx, sequence) - if err != nil { log.WithError(err).Error("failed send to Pulsar") return nil, status.Error(codes.Internal, "Failed to send message") diff --git a/internal/executor/job/processors/preempt_runs.go b/internal/executor/job/processors/preempt_runs.go index 5efbca60b70..9d98a73cfde 100644 --- a/internal/executor/job/processors/preempt_runs.go +++ b/internal/executor/job/processors/preempt_runs.go @@ -96,7 +96,6 @@ func (j *RunPreemptedProcessor) reportPodPreempted(run *job.RunState, pod *v1.Po domain.JobPreemptedAnnotation: time.Now().String(), string(v1.PodFailed): time.Now().String(), }) - if err != nil { return fmt.Errorf("failed to annotate pod as preempted - %s", err) } diff --git a/internal/pulsartest/app.go b/internal/pulsartest/app.go index 93b79259bfd..50b0ceb0584 100644 --- a/internal/pulsartest/app.go +++ b/internal/pulsartest/app.go @@ -37,7 +37,6 @@ func New(params Params, cmdType string) (*App, error) { Name: producerName, Topic: params.Pulsar.JobsetEventsTopic, }) - if err != nil { return nil, errors.Wrapf(err, "error creating pulsar producer %s", producerName) } @@ -47,7 +46,6 @@ func New(params Params, cmdType string) (*App, error) { Topic: params.Pulsar.JobsetEventsTopic, StartMessageID: pulsar.EarliestMessageID(), }) - if err != nil { return nil, errors.Wrapf(err, "error creating pulsar reader") } diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index cf24b99b75d..a59a5428ed5 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -212,7 +212,7 @@ func TestGangScheduler(t *testing.T) { }, "resolution has no impact on jobs of size a multiple of the resolution": { SchedulingConfig: testfixtures.WithIndexedResourcesConfig( - []configuration.IndexedResource{ + []configuration.ResourceType{ {Name: "cpu", Resolution: resource.MustParse("16")}, {Name: "memory", Resolution: resource.MustParse("128Mi")}, }, @@ -233,7 +233,7 @@ func TestGangScheduler(t *testing.T) { }, "jobs of size not a multiple of the resolution blocks scheduling new jobs": { SchedulingConfig: testfixtures.WithIndexedResourcesConfig( - []configuration.IndexedResource{ + []configuration.ResourceType{ {Name: "cpu", Resolution: resource.MustParse("17")}, {Name: "memory", Resolution: resource.MustParse("128Mi")}, }, @@ -252,7 +252,7 @@ func TestGangScheduler(t *testing.T) { }, "consider all nodes in the bucket": { SchedulingConfig: testfixtures.WithIndexedResourcesConfig( - []configuration.IndexedResource{ + []configuration.ResourceType{ {Name: "cpu", Resolution: resource.MustParse("1")}, {Name: "memory", Resolution: resource.MustParse("1Mi")}, {Name: "gpu", Resolution: resource.MustParse("1")}, @@ -593,6 +593,7 @@ func TestGangScheduler(t *testing.T) { tc.SchedulingConfig.IndexedNodeLabels, tc.SchedulingConfig.WellKnownNodeTypes, stringinterner.New(1024), + testfixtures.TestResourceListFactory, ) require.NoError(t, err) txn := nodeDb.Txn(true) diff --git a/internal/scheduler/internaltypes/quantity_util.go b/internal/scheduler/internaltypes/quantity_util.go new file mode 100644 index 00000000000..1857ed2cce1 --- /dev/null +++ b/internal/scheduler/internaltypes/quantity_util.go @@ -0,0 +1,18 @@ +package internaltypes + +import ( + "k8s.io/apimachinery/pkg/api/resource" +) + +func QuantityToInt64RoundUp(q resource.Quantity, scale resource.Scale) int64 { + return q.ScaledValue(scale) +} + +func QuantityToInt64RoundDown(q resource.Quantity, scale resource.Scale) int64 { + result := q.ScaledValue(scale) + q2 := resource.NewScaledQuantity(result, scale) + if q2.Cmp(q) > 0 { + result-- + } + return result +} diff --git a/internal/scheduler/internaltypes/quantity_util_test.go b/internal/scheduler/internaltypes/quantity_util_test.go new file mode 100644 index 00000000000..7300906bca6 --- /dev/null +++ b/internal/scheduler/internaltypes/quantity_util_test.go @@ -0,0 +1,69 @@ +package internaltypes + +import ( + "testing" + + "gopkg.in/inf.v0" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/resource" +) + +type quantityTest struct { + q resource.Quantity + expectedIntRoundDown int64 + expectedIntRoundUp int64 +} + +func TestQuantityToInt64_WithScaleMillis(t *testing.T) { + tests := []quantityTest{ + {resource.MustParse("0"), 0, 0}, + {resource.MustParse("1"), 1000, 1000}, + {resource.MustParse("1m"), 1, 1}, + {resource.MustParse("50m"), 50, 50}, + {resource.MustParse("1Mi"), 1024 * 1024 * 1000, 1024 * 1024 * 1000}, + {resource.MustParse("1e3"), 1000 * 1000, 1000 * 1000}, + {*resource.NewMilliQuantity(1, resource.DecimalExponent), 1, 1}, + {*resource.NewDecimalQuantity(*inf.NewDec(1, inf.Scale(0)), resource.DecimalExponent), 1000, 1000}, + {resource.MustParse("1n"), 0, 1}, + {resource.MustParse("100n"), 0, 1}, + {resource.MustParse("999999999n"), 999, 1000}, + {resource.MustParse("1000000000n"), 1000, 1000}, + {resource.MustParse("1000000001n"), 1000, 1001}, + {resource.MustParse("12.3m"), 12, 13}, + {resource.MustParse("0.99m"), 0, 1}, + {resource.MustParse("1.001m"), 1, 2}, + } + + for _, test := range tests { + assert.Equal(t, test.expectedIntRoundDown, QuantityToInt64RoundDown(test.q, resource.Milli), test.q) + assert.Equal(t, test.expectedIntRoundUp, QuantityToInt64RoundUp(test.q, resource.Milli), test.q) + } +} + +func TestQuantityToInt64_WithUnitScale(t *testing.T) { + const tebi = 1024 * 1024 * 1024 * 1024 + tests := []quantityTest{ + {resource.MustParse("0"), 0, 0}, + {resource.MustParse("1"), 1, 1}, + {resource.MustParse("1m"), 0, 1}, + {resource.MustParse("50m"), 0, 1}, + {resource.MustParse("1Mi"), 1024 * 1024, 1024 * 1024}, + {resource.MustParse("1.5Mi"), 1536 * 1024, 1536 * 1024}, + {resource.MustParse("4Ti"), 4 * tebi, 4 * tebi}, + {resource.MustParse("100000Ti"), 100000 * tebi, 100000 * tebi}, + {resource.MustParse("1e3"), 1000, 1000}, + {*resource.NewMilliQuantity(1, resource.DecimalExponent), 0, 1}, + {*resource.NewDecimalQuantity(*inf.NewDec(1, inf.Scale(0)), resource.DecimalExponent), 1, 1}, + {resource.MustParse("1n"), 0, 1}, + {resource.MustParse("100n"), 0, 1}, + {resource.MustParse("999999999n"), 0, 1}, + {resource.MustParse("1000000000n"), 1, 1}, + {resource.MustParse("1000000001n"), 1, 2}, + } + + for _, test := range tests { + assert.Equal(t, test.expectedIntRoundDown, QuantityToInt64RoundDown(test.q, resource.Scale(0)), test.q) + assert.Equal(t, test.expectedIntRoundUp, QuantityToInt64RoundUp(test.q, resource.Scale(0)), test.q) + } +} diff --git a/internal/scheduler/internaltypes/resource_list.go b/internal/scheduler/internaltypes/resource_list.go new file mode 100644 index 00000000000..e4bf589bd67 --- /dev/null +++ b/internal/scheduler/internaltypes/resource_list.go @@ -0,0 +1,16 @@ +package internaltypes + +import "fmt" + +type ResourceList struct { + resources []int64 + factory *ResourceListFactory +} + +func (rl *ResourceList) GetByName(name string) (int64, error) { + index, ok := rl.factory.nameToIndex[name] + if !ok { + return 0, fmt.Errorf("resource type %s not found", name) + } + return rl.resources[index], nil +} diff --git a/internal/scheduler/internaltypes/resource_list_factory.go b/internal/scheduler/internaltypes/resource_list_factory.go new file mode 100644 index 00000000000..de20e3bc4bc --- /dev/null +++ b/internal/scheduler/internaltypes/resource_list_factory.go @@ -0,0 +1,93 @@ +package internaltypes + +import ( + "fmt" + "math" + + "github.com/pkg/errors" + + k8sResource "k8s.io/apimachinery/pkg/api/resource" + + "github.com/armadaproject/armada/internal/armada/configuration" + "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" +) + +type ResourceListFactory struct { + nameToIndex map[string]int + indexToName []string + scales []k8sResource.Scale +} + +func MakeResourceListFactory(supportedResourceTypes []configuration.ResourceType) (*ResourceListFactory, error) { + if len(supportedResourceTypes) == 0 { + return nil, errors.New("no resource types configured") + } + indexToName := make([]string, len(supportedResourceTypes)) + nameToIndex := make(map[string]int, len(supportedResourceTypes)) + scales := make([]k8sResource.Scale, len(supportedResourceTypes)) + for i, t := range supportedResourceTypes { + if dup, exists := nameToIndex[t.Name]; exists { + return nil, fmt.Errorf("duplicate resource type name %q", dup) + } + nameToIndex[t.Name] = i + indexToName[i] = t.Name + scales[i] = resolutionToScale(t.Resolution) + } + return &ResourceListFactory{ + indexToName: indexToName, + nameToIndex: nameToIndex, + scales: scales, + }, nil +} + +// Convert resolution to a k8sResource.Scale +// e.g. +// 1 -> 0 +// 0.001 -> -3 +// 1000 -> 3 +func resolutionToScale(resolution k8sResource.Quantity) k8sResource.Scale { + if resolution.Sign() < 1 { + return k8sResource.Milli + } + return k8sResource.Scale(math.Floor(math.Log10(resolution.AsApproximateFloat64()))) +} + +// Ignore unknown resources, round down. +func (factory *ResourceListFactory) FromNodeProto(resources schedulerobjects.ResourceList) ResourceList { + result := make([]int64, len(factory.indexToName)) + for k, v := range resources.Resources { + index, ok := factory.nameToIndex[k] + if ok { + result[index] = QuantityToInt64RoundDown(v, factory.scales[index]) + } + } + return ResourceList{resources: result, factory: factory} +} + +// Fail on unknown resources, round up. +func (factory *ResourceListFactory) FromJobProto(resources schedulerobjects.ResourceList) (ResourceList, error) { + result := make([]int64, len(factory.indexToName)) + for k, v := range resources.Resources { + index, ok := factory.nameToIndex[k] + if ok { + result[index] = QuantityToInt64RoundUp(v, factory.scales[index]) + } else { + return ResourceList{}, fmt.Errorf("unknown resource type %q", k) + } + } + return ResourceList{resources: result, factory: factory}, nil +} + +func (factory *ResourceListFactory) SummaryString() string { + result := "" + for i, name := range factory.indexToName { + if i > 0 { + result += " " + } + scale := factory.scales[i] + resolution := k8sResource.NewScaledQuantity(1, scale) + maxValue := k8sResource.NewScaledQuantity(math.MaxInt64, scale) + result += fmt.Sprintf("%s (scale %v, resolution %v, maxValue %f)", name, scale, resolution, maxValue.AsApproximateFloat64()) + } + return result +} diff --git a/internal/scheduler/internaltypes/resource_list_factory_test.go b/internal/scheduler/internaltypes/resource_list_factory_test.go new file mode 100644 index 00000000000..c0f087f384e --- /dev/null +++ b/internal/scheduler/internaltypes/resource_list_factory_test.go @@ -0,0 +1,85 @@ +package internaltypes + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" + k8sResource "k8s.io/apimachinery/pkg/api/resource" + + "github.com/armadaproject/armada/internal/armada/configuration" + "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" +) + +func TestMakeResourceListFactory(t *testing.T) { + factory := testFactory() + + assert.Equal(t, []string{"memory", "ephemeral-storage", "cpu", "nvidia.com/gpu"}, factory.indexToName) + assert.Equal(t, map[string]int{"memory": 0, "ephemeral-storage": 1, "cpu": 2, "nvidia.com/gpu": 3}, factory.nameToIndex) + assert.Equal(t, []k8sResource.Scale{0, 0, k8sResource.Milli, k8sResource.Milli}, factory.scales) +} + +func TestResolutionToScale(t *testing.T) { + assert.Equal(t, k8sResource.Scale(0), resolutionToScale(k8sResource.MustParse("1"))) + assert.Equal(t, k8sResource.Scale(-3), resolutionToScale(k8sResource.MustParse("0.001"))) + assert.Equal(t, k8sResource.Scale(-3), resolutionToScale(k8sResource.MustParse("0.0011"))) + assert.Equal(t, k8sResource.Scale(-4), resolutionToScale(k8sResource.MustParse("0.00099"))) + assert.Equal(t, k8sResource.Scale(3), resolutionToScale(k8sResource.MustParse("1000"))) +} + +func TestResolutionToScaleDefaultsCorrectly(t *testing.T) { + defaultValue := k8sResource.Scale(-3) + assert.Equal(t, defaultValue, resolutionToScale(k8sResource.MustParse("0"))) + assert.Equal(t, defaultValue, k8sResource.Scale(-3), resolutionToScale(k8sResource.MustParse("-1"))) +} + +func TestFromNodeProto(t *testing.T) { + factory := testFactory() + result := factory.FromNodeProto(schedulerobjects.ResourceList{Resources: map[string]k8sResource.Quantity{ + "memory": k8sResource.MustParse("100Mi"), + "cpu": k8sResource.MustParse("9999999n"), + "missing": k8sResource.MustParse("200Mi"), // should ignore missing + }}) + assert.Equal(t, int64(100*1024*1024), testGet(&result, "memory")) + assert.Equal(t, int64(9), testGet(&result, "cpu")) + assert.Equal(t, int64(0), testGet(&result, "nvidia.com/gpu")) +} + +func TestFromJobProto(t *testing.T) { + factory := testFactory() + result, err := factory.FromJobProto(schedulerobjects.ResourceList{Resources: map[string]k8sResource.Quantity{ + "memory": k8sResource.MustParse("100Mi"), + "cpu": k8sResource.MustParse("9999999n"), + }}) + assert.Nil(t, err) + assert.Equal(t, int64(100*1024*1024), testGet(&result, "memory")) + assert.Equal(t, int64(10), testGet(&result, "cpu")) + assert.Equal(t, int64(0), testGet(&result, "nvidia.com/gpu")) +} + +func TestFromJobProtoErrorsIfMissing(t *testing.T) { + factory := testFactory() + _, err := factory.FromJobProto(schedulerobjects.ResourceList{Resources: map[string]k8sResource.Quantity{ + "memory": k8sResource.MustParse("100Mi"), + "missing": k8sResource.MustParse("1"), + }}) + assert.NotNil(t, err) +} + +func testFactory() *ResourceListFactory { + factory, _ := MakeResourceListFactory([]configuration.ResourceType{ + {Name: "memory", Resolution: k8sResource.MustParse("1")}, + {Name: "ephemeral-storage", Resolution: k8sResource.MustParse("1")}, + {Name: "cpu", Resolution: k8sResource.MustParse("1m")}, + {Name: "nvidia.com/gpu", Resolution: k8sResource.MustParse("1m")}, + }) + return factory +} + +func testGet(rl *ResourceList, name string) int64 { + val, err := rl.GetByName(name) + if err != nil { + return math.MinInt64 + } + return val +} diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index 64f81b242b6..725189539d8 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -234,21 +234,24 @@ type NodeDb struct { scheduledAtPriorityByJobId map[string]int32 stringInterner *stringinterner.StringInterner + + resourceListFactory *internaltypes.ResourceListFactory } func NewNodeDb( priorityClasses map[string]types.PriorityClass, maxExtraNodesToConsider uint, - indexedResources []configuration.IndexedResource, + indexedResources []configuration.ResourceType, indexedTaints []string, indexedNodeLabels []string, wellKnownNodeTypes []configuration.WellKnownNodeType, stringInterner *stringinterner.StringInterner, + resourceListFactory *internaltypes.ResourceListFactory, ) (*NodeDb, error) { nodeDbPriorities := []int32{evictedPriority} nodeDbPriorities = append(nodeDbPriorities, types.AllowedPriorities(priorityClasses)...) - indexedResourceNames := util.Map(indexedResources, func(v configuration.IndexedResource) string { return v.Name }) + indexedResourceNames := util.Map(indexedResources, func(v configuration.ResourceType) string { return v.Name }) schema, indexNameByPriority, keyIndexByPriority := nodeDbSchema(nodeDbPriorities, indexedResourceNames) db, err := memdb.NewMemDB(schema) if err != nil { @@ -288,7 +291,7 @@ func NewNodeDb( indexedResourcesSet: mapFromSlice(indexedResourceNames), indexedResourceResolutionMillis: util.Map( indexedResources, - func(v configuration.IndexedResource) int64 { return v.Resolution.MilliValue() }, + func(v configuration.ResourceType) int64 { return v.Resolution.MilliValue() }, ), indexNameByPriority: indexNameByPriority, keyIndexByPriority: keyIndexByPriority, @@ -305,6 +308,7 @@ func NewNodeDb( scheduledAtPriorityByJobId: make(map[string]int32), stringInterner: stringInterner, + resourceListFactory: resourceListFactory, } for _, wellKnownNodeType := range wellKnownNodeTypes { diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 286b56d6e57..e4181d9dc7a 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -567,6 +567,7 @@ func TestAwayNodeTypes(t *testing.T) { testfixtures.TestIndexedNodeLabels, testfixtures.TestWellKnownNodeTypes, stringinterner.New(1024), + testfixtures.TestResourceListFactory, ) require.NoError(t, err) @@ -618,6 +619,7 @@ func benchmarkUpsert(nodes []*schedulerobjects.Node, b *testing.B) { testfixtures.TestIndexedNodeLabels, testfixtures.TestWellKnownNodeTypes, stringinterner.New(1024), + testfixtures.TestResourceListFactory, ) require.NoError(b, err) txn := nodeDb.Txn(true) @@ -658,6 +660,7 @@ func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs [] testfixtures.TestIndexedNodeLabels, testfixtures.TestWellKnownNodeTypes, stringinterner.New(1024), + testfixtures.TestResourceListFactory, ) require.NoError(b, err) txn := nodeDb.Txn(true) @@ -784,6 +787,7 @@ func newNodeDbWithNodes(nodes []*schedulerobjects.Node) (*NodeDb, error) { testfixtures.TestIndexedNodeLabels, testfixtures.TestWellKnownNodeTypes, stringinterner.New(1024), + testfixtures.TestResourceListFactory, ) if err != nil { return nil, err diff --git a/internal/scheduler/pool_assigner.go b/internal/scheduler/pool_assigner.go index 5ed663d7abd..1f9a8c00de6 100644 --- a/internal/scheduler/pool_assigner.go +++ b/internal/scheduler/pool_assigner.go @@ -15,6 +15,7 @@ import ( "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -36,7 +37,7 @@ type DefaultPoolAssigner struct { executorTimeout time.Duration priorityClasses map[string]types.PriorityClass priorities []int32 - indexedResources []configuration.IndexedResource + indexedResources []configuration.ResourceType indexedTaints []string indexedNodeLabels []string wellKnownNodeTypes []configuration.WellKnownNodeType @@ -46,11 +47,13 @@ type DefaultPoolAssigner struct { schedulingKeyGenerator *schedulerobjects.SchedulingKeyGenerator poolCache *lru.Cache clock clock.Clock + resourceListFactory *internaltypes.ResourceListFactory } func NewPoolAssigner(executorTimeout time.Duration, schedulingConfig configuration.SchedulingConfig, executorRepository database.ExecutorRepository, + resourceListFactory *internaltypes.ResourceListFactory, ) (*DefaultPoolAssigner, error) { poolCache, err := lru.New(maxJobSchedulingResults) if err != nil { @@ -70,6 +73,7 @@ func NewPoolAssigner(executorTimeout time.Duration, schedulingKeyGenerator: schedulerobjects.NewSchedulingKeyGenerator(), poolCache: poolCache, clock: clock.RealClock{}, + resourceListFactory: resourceListFactory, }, nil } @@ -161,6 +165,7 @@ func (p *DefaultPoolAssigner) constructNodeDb(nodes []*schedulerobjects.Node) (* p.indexedNodeLabels, p.wellKnownNodeTypes, stringinterner.New(1024), + p.resourceListFactory, ) if err != nil { return nil, err diff --git a/internal/scheduler/pool_assigner_test.go b/internal/scheduler/pool_assigner_test.go index faf37636542..f6bdb67e065 100644 --- a/internal/scheduler/pool_assigner_test.go +++ b/internal/scheduler/pool_assigner_test.go @@ -54,7 +54,7 @@ func TestPoolAssigner_AssignPool(t *testing.T) { mockExecutorRepo := schedulermocks.NewMockExecutorRepository(ctrl) mockExecutorRepo.EXPECT().GetExecutors(ctx).Return(tc.executors, nil).AnyTimes() fakeClock := clock.NewFakeClock(testfixtures.BaseTime) - assigner, err := NewPoolAssigner(tc.executorTimout, tc.config, mockExecutorRepo) + assigner, err := NewPoolAssigner(tc.executorTimout, tc.config, mockExecutorRepo, testfixtures.TestResourceListFactory) require.NoError(t, err) assigner.clock = fakeClock diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index d036550d66d..90a80c804f3 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -736,6 +736,7 @@ func NewNodeDb(config configuration.SchedulingConfig, stringInterner *stringinte config.IndexedNodeLabels, config.WellKnownNodeTypes, stringInterner, + testfixtures.TestResourceListFactory, ) if err != nil { return nil, err diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index bb8c2a28a62..baaf79dd5d9 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -38,6 +38,7 @@ import ( schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/failureestimator" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/leader" "github.com/armadaproject/armada/internal/scheduler/metrics" @@ -75,6 +76,15 @@ func Run(config schedulerconfig.Configuration) error { shutdownHttpServer := common.ServeHttp(uint16(config.Http.Port), mux) defer shutdownHttpServer() + // //////////////////////////////////////////////////////////////////////// + // Resource list factory + // //////////////////////////////////////////////////////////////////////// + resourceListFactory, err := internaltypes.MakeResourceListFactory(config.Scheduling.SupportedResourceTypes) + if err != nil { + return errors.WithMessage(err, "Error with the .scheduling.supportedResourceTypes field in config") + } + ctx.Infof("Supported resource types: %s", resourceListFactory.SummaryString()) + // List of services to run concurrently. // Because we want to start services only once all input validation has been completed, // we add all services to a slice and start them together at the end of this function. @@ -216,6 +226,7 @@ func Run(config schedulerconfig.Configuration) error { 30*time.Minute, config.Scheduling, executorRepository, + resourceListFactory, ) services = append(services, func() error { return submitChecker.Run(ctx) @@ -275,6 +286,7 @@ func Run(config schedulerconfig.Configuration) error { nodeQuarantiner, queueQuarantiner, stringInterner, + resourceListFactory, ) if err != nil { return errors.WithMessage(err, "error creating scheduling algo") @@ -324,7 +336,7 @@ func Run(config schedulerconfig.Configuration) error { // //////////////////////////////////////////////////////////////////////// // Metrics // //////////////////////////////////////////////////////////////////////// - poolAssigner, err := NewPoolAssigner(config.Scheduling.ExecutorTimeout, config.Scheduling, executorRepository) + poolAssigner, err := NewPoolAssigner(config.Scheduling.ExecutorTimeout, config.Scheduling, executorRepository, resourceListFactory) if err != nil { return errors.WithMessage(err, "error creating pool assigner") } diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 9b6b27b5c50..7ea297f3eea 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -25,6 +25,7 @@ import ( schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/fairness" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/quarantine" @@ -64,9 +65,10 @@ type FairSchedulingAlgo struct { // Function that is called every time an executor is scheduled. Useful for testing. onExecutorScheduled func(executor *schedulerobjects.Executor) // rand and clock injected here for repeatable testing. - rand *rand.Rand - clock clock.Clock - stringInterner *stringinterner.StringInterner + rand *rand.Rand + clock clock.Clock + stringInterner *stringinterner.StringInterner + resourceListFactory *internaltypes.ResourceListFactory } func NewFairSchedulingAlgo( @@ -78,6 +80,7 @@ func NewFairSchedulingAlgo( nodeQuarantiner *quarantine.NodeQuarantiner, queueQuarantiner *quarantine.QueueQuarantiner, stringInterner *stringinterner.StringInterner, + resourceListFactory *internaltypes.ResourceListFactory, ) (*FairSchedulingAlgo, error) { if _, ok := config.PriorityClasses[config.DefaultPriorityClassName]; !ok { return nil, errors.Errorf( @@ -99,6 +102,7 @@ func NewFairSchedulingAlgo( rand: util.NewThreadsafeRand(time.Now().UnixNano()), clock: clock.RealClock{}, stringInterner: stringInterner, + resourceListFactory: resourceListFactory, }, nil } @@ -370,6 +374,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( l.schedulingConfig.IndexedNodeLabels, l.schedulingConfig.WellKnownNodeTypes, l.stringInterner, + l.resourceListFactory, ) if err != nil { return nil, nil, err diff --git a/internal/scheduler/scheduling_algo_test.go b/internal/scheduler/scheduling_algo_test.go index f968607d601..d664aa229cd 100644 --- a/internal/scheduler/scheduling_algo_test.go +++ b/internal/scheduler/scheduling_algo_test.go @@ -393,6 +393,7 @@ func TestSchedule(t *testing.T) { nil, nil, stringinterner.New(1024), + testfixtures.TestResourceListFactory, ) require.NoError(t, err) @@ -555,6 +556,7 @@ func BenchmarkNodeDbConstruction(b *testing.B) { nil, nil, stringInterner, + testfixtures.TestResourceListFactory, ) require.NoError(b, err) b.StartTimer() @@ -567,6 +569,7 @@ func BenchmarkNodeDbConstruction(b *testing.B) { schedulingConfig.IndexedNodeLabels, schedulingConfig.WellKnownNodeTypes, stringInterner, + testfixtures.TestResourceListFactory, ) require.NoError(b, err) err = algo.addExecutorToNodeDb(nodeDb, jobs, nodes) diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index f2d4463729e..772a76d4bb0 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -26,6 +26,7 @@ import ( schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/fairness" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -79,6 +80,8 @@ type Simulator struct { // If true, scheduler logs are omitted. // This since the logs are very verbose when scheduling large numbers of jobs. SuppressSchedulerLogs bool + // For making internaltypes.ResourceList + resourceListFactory *internaltypes.ResourceListFactory } type StateTransition struct { @@ -89,6 +92,10 @@ type StateTransition struct { func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, schedulingConfig configuration.SchedulingConfig) (*Simulator, error) { // TODO: Move clone to caller? // Copy specs to avoid concurrent mutation. + resourceListFactory, err := internaltypes.MakeResourceListFactory(schedulingConfig.SupportedResourceTypes) + if err != nil { + return nil, errors.WithMessage(err, "Error with the .scheduling.supportedResourceTypes field in config") + } clusterSpec = proto.Clone(clusterSpec).(*ClusterSpec) workloadSpec = proto.Clone(workloadSpec).(*WorkloadSpec) initialiseClusterSpec(clusterSpec) @@ -126,8 +133,9 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli rate.Limit(schedulingConfig.MaximumSchedulingRate), schedulingConfig.MaximumSchedulingBurst, ), - limiterByQueue: make(map[string]*rate.Limiter), - rand: rand.New(rand.NewSource(randomSeed)), + limiterByQueue: make(map[string]*rate.Limiter), + rand: rand.New(rand.NewSource(randomSeed)), + resourceListFactory: resourceListFactory, } jobDb.SetClock(s) s.limiter.SetBurstAt(s.time, schedulingConfig.MaximumSchedulingBurst) @@ -231,6 +239,7 @@ func (s *Simulator) setupClusters() error { s.schedulingConfig.IndexedNodeLabels, s.schedulingConfig.WellKnownNodeTypes, stringinterner.New(1024), + s.resourceListFactory, ) if err != nil { return err diff --git a/internal/scheduler/simulator/test_utils.go b/internal/scheduler/simulator/test_utils.go index c44ba04a1e8..bc2c300ff71 100644 --- a/internal/scheduler/simulator/test_utils.go +++ b/internal/scheduler/simulator/test_utils.go @@ -62,7 +62,7 @@ func GetBasicSchedulingConfig() configuration.SchedulingConfig { "cpu": 0.025, }, DominantResourceFairnessResourcesToConsider: []string{"cpu", "memory", "nvidia.com/gpu", "ephemeral-storage"}, - IndexedResources: []configuration.IndexedResource{ + IndexedResources: []configuration.ResourceType{ { Name: "cpu", Resolution: resource.MustParse("1"), diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index b5bc3bee331..bd4b428a0ac 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -20,6 +20,7 @@ import ( "github.com/armadaproject/armada/internal/scheduler/adapters" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -48,7 +49,7 @@ type SubmitChecker struct { priorityClasses map[string]types.PriorityClass executorById map[string]minimalExecutor priorities []int32 - indexedResources []configuration.IndexedResource + indexedResources []configuration.ResourceType indexedTaints []string indexedNodeLabels []string wellKnownNodeTypes []configuration.WellKnownNodeType @@ -58,12 +59,14 @@ type SubmitChecker struct { schedulingKeyGenerator *schedulerobjects.SchedulingKeyGenerator jobSchedulingResultsCache *lru.Cache ExecutorUpdateFrequency time.Duration + resourceListFactory *internaltypes.ResourceListFactory } func NewSubmitChecker( executorTimeout time.Duration, schedulingConfig configuration.SchedulingConfig, executorRepository database.ExecutorRepository, + resourceListFactory *internaltypes.ResourceListFactory, ) *SubmitChecker { jobSchedulingResultsCache, err := lru.New(maxJobSchedulingResults) if err != nil { @@ -83,6 +86,7 @@ func NewSubmitChecker( schedulingKeyGenerator: schedulerobjects.NewSchedulingKeyGenerator(), jobSchedulingResultsCache: jobSchedulingResultsCache, ExecutorUpdateFrequency: schedulingConfig.ExecutorUpdateFrequency, + resourceListFactory: resourceListFactory, } } @@ -310,6 +314,7 @@ func (srv *SubmitChecker) constructNodeDb(nodes []*schedulerobjects.Node) (*node srv.indexedNodeLabels, srv.wellKnownNodeTypes, stringinterner.New(512), + srv.resourceListFactory, ) if err != nil { return nil, err diff --git a/internal/scheduler/submitcheck_test.go b/internal/scheduler/submitcheck_test.go index 75baaa1dafb..10413a1d8bf 100644 --- a/internal/scheduler/submitcheck_test.go +++ b/internal/scheduler/submitcheck_test.go @@ -76,7 +76,7 @@ func TestSubmitChecker_CheckJobDbJobs(t *testing.T) { mockExecutorRepo := schedulermocks.NewMockExecutorRepository(ctrl) mockExecutorRepo.EXPECT().GetExecutors(ctx).Return(tc.executors, nil).AnyTimes() fakeClock := clock.NewFakeClock(baseTime) - submitCheck := NewSubmitChecker(tc.executorTimout, tc.config, mockExecutorRepo) + submitCheck := NewSubmitChecker(tc.executorTimout, tc.config, mockExecutorRepo, testfixtures.TestResourceListFactory) submitCheck.clock = fakeClock submitCheck.updateExecutors(ctx) isSchedulable, reason := submitCheck.CheckJobDbJobs([]*jobdb.Job{tc.job}) @@ -181,7 +181,7 @@ func TestSubmitChecker_TestCheckApiJobs(t *testing.T) { mockExecutorRepo := schedulermocks.NewMockExecutorRepository(ctrl) mockExecutorRepo.EXPECT().GetExecutors(ctx).Return(tc.executors, nil).AnyTimes() fakeClock := clock.NewFakeClock(testfixtures.BaseTime) - submitCheck := NewSubmitChecker(tc.executorTimout, tc.config, mockExecutorRepo) + submitCheck := NewSubmitChecker(tc.executorTimout, tc.config, mockExecutorRepo, testfixtures.TestResourceListFactory) submitCheck.clock = fakeClock submitCheck.updateExecutors(ctx) events := armadaslices.Map(tc.jobs, func(s *armadaevents.SubmitJob) *armadaevents.EventSequence_Event { diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index e653bbc845b..313033aa592 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -20,6 +20,7 @@ import ( "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/common/util" schedulerconfiguration "github.com/armadaproject/armada/internal/scheduler/configuration" + "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/pkg/api" @@ -52,18 +53,18 @@ var ( TestDefaultPriorityClass = PriorityClass3 TestPriorities = []int32{0, 1, 2, 3} TestMaxExtraNodesToConsider uint = 1 - TestResources = []configuration.IndexedResource{ + TestResources = []configuration.ResourceType{ {Name: "cpu", Resolution: resource.MustParse("1")}, {Name: "memory", Resolution: resource.MustParse("128Mi")}, {Name: "gpu", Resolution: resource.MustParse("1")}, } TestResourceNames = util.Map( TestResources, - func(v configuration.IndexedResource) string { return v.Name }, + func(v configuration.ResourceType) string { return v.Name }, ) TestIndexedResourceResolutionMillis = util.Map( TestResources, - func(v configuration.IndexedResource) int64 { return v.Resolution.MilliValue() }, + func(v configuration.ResourceType) int64 { return v.Resolution.MilliValue() }, ) TestIndexedTaints = []string{"largeJobsOnly", "gpu"} TestIndexedNodeLabels = []string{"largeJobsOnly", "gpu"} @@ -80,7 +81,8 @@ var ( // We use the all-zeros key here to ensure scheduling keys are cosnsitent between tests. SchedulingKeyGenerator = schedulerobjects.NewSchedulingKeyGeneratorWithKey(make([]byte, 32)) // Used for job creation. - JobDb = NewJobDb() + JobDb = NewJobDb() + TestResourceListFactory = MakeTestResourceListFactory() ) func NewJobDbWithJobs(jobs []*jobdb.Job) *jobdb.JobDb { @@ -142,6 +144,7 @@ func TestSchedulingConfig() configuration.SchedulingConfig { DominantResourceFairnessResourcesToConsider: TestResourceNames, ExecutorTimeout: 15 * time.Minute, MaxUnacknowledgedJobsPerExecutor: math.MaxInt, + SupportedResourceTypes: GetTestSupportedResourceTypes(), } } @@ -192,7 +195,7 @@ func WithPerPriorityLimitsConfig(limits map[string]map[string]float64, config co return config } -func WithIndexedResourcesConfig(indexResources []configuration.IndexedResource, config configuration.SchedulingConfig) configuration.SchedulingConfig { +func WithIndexedResourcesConfig(indexResources []configuration.ResourceType, config configuration.SchedulingConfig) configuration.SchedulingConfig { config.IndexedResources = indexResources return config } @@ -963,3 +966,16 @@ func (p *MockPassiveClock) Now() time.Time { func (p *MockPassiveClock) Since(time.Time) time.Duration { panic("Not implemented") } + +func MakeTestResourceListFactory() *internaltypes.ResourceListFactory { + result, _ := internaltypes.MakeResourceListFactory(GetTestSupportedResourceTypes()) + return result +} + +func GetTestSupportedResourceTypes() []configuration.ResourceType { + return []configuration.ResourceType{ + {Name: "memory", Resolution: resource.MustParse("1")}, + {Name: "cpu", Resolution: resource.MustParse("1m")}, + {Name: "gpu", Resolution: resource.MustParse("1m")}, + } +}