Skip to content

Commit

Permalink
Scheduler: efficient resource implementation (#3561)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdavidsmith committed May 2, 2024
1 parent 586efd5 commit 7f2085d
Show file tree
Hide file tree
Showing 27 changed files with 408 additions and 36 deletions.
9 changes: 9 additions & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ eventsApiRedis:
# This config must be consistent with the scheduling config used by the scheduler.
# You may want to insert the scheduling config used for the scheduler automatically, e.g., using PyYAML, to guarantee consistency.
scheduling:
supportedResourceTypes:
- name: memory
resolution: "1"
- name: cpu
resolution: "1m"
- name: ephemeral-storage
resolution: "1"
- name: nvidia.com/gpu
resolution: "1"
executorTimeout: "60m"
executorUpdateFrequency: "1m"
priorityClasses:
Expand Down
9 changes: 9 additions & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ grpc:
enabled: false
# You may want to configure indexedNodeLabels and indexedTaints to speed up scheduling.
scheduling:
supportedResourceTypes:
- name: memory
resolution: "1"
- name: cpu
resolution: "1m"
- name: ephemeral-storage
resolution: "1"
- name: nvidia.com/gpu
resolution: "1"
disableScheduling: false
enableAssertions: false
nodeEvictionProbability: 1.0
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ require (
golang.org/x/time v0.3.0
gonum.org/v1/gonum v0.14.0
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9
gopkg.in/inf.v0 v0.9.1
)

require (
Expand Down Expand Up @@ -204,7 +205,6 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
20 changes: 12 additions & 8 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,19 @@ type SchedulingConfig struct {
// Hence, a larger MaxExtraNodesToConsider would reduce the expected number of preemptions.
// TODO(albin): Remove. It's unused.
MaxExtraNodesToConsider uint
// Resource types (e.g. memory or nvidia.com/gpu) that the scheduler keeps track of.
// Resource types not on this list will be ignored if seen on a node, and any jobs requesting them will fail.
SupportedResourceTypes []ResourceType
// Resources, e.g., "cpu", "memory", and "nvidia.com/gpu", for which the scheduler creates indexes for efficient lookup.
// This list must contain at least one resource. Adding more than one resource is not required, but may speed up scheduling.
// Ideally, this list contains all resources that frequently constrain which nodes a job can be scheduled onto.
IndexedResources []IndexedResource
//
// In particular, the allocatable resources on each node are rounded to a multiple of the resolution.
// Lower resolution speeds up scheduling by improving node lookup speed but may prevent scheduling jobs,
// since the allocatable resources may be rounded down to be a multiple of the resolution.
//
// See NodeDb docs for more details.
IndexedResources []ResourceType
// Node labels that the scheduler creates indexes for efficient lookup of.
// Should include node labels frequently used by node selectors on submitted jobs.
//
Expand Down Expand Up @@ -295,16 +304,11 @@ func SchedulingConfigValidation(sl validator.StructLevel) {
}
}

// IndexedResource represents a resource the scheduler indexes for efficient lookup.
type IndexedResource struct {
// ResourceType represents a resource the scheduler indexes for efficient lookup.
type ResourceType struct {
// Resource name, e.g., "cpu", "memory", or "nvidia.com/gpu".
Name string
// Resolution with which Armada tracks this resource; larger values indicate lower resolution.
// In particular, the allocatable resources on each node are rounded to a multiple of the resolution.
// Lower resolution speeds up scheduling by improving node lookup speed but may prevent scheduling jobs,
// since the allocatable resources may be rounded down to be a multiple of the resolution.
//
// See NodeDb docs for more details.
Resolution resource.Quantity
}

Expand Down
8 changes: 8 additions & 0 deletions internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/scheduler"
schedulerdb "github.com/armadaproject/armada/internal/scheduler/database"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/reports"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
Expand Down Expand Up @@ -144,10 +145,17 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt

// Executor Repositories for pulsar scheduler
pulsarExecutorRepo := schedulerdb.NewRedisExecutorRepository(db, "pulsar")

resourceListFactory, err := internaltypes.MakeResourceListFactory(config.Scheduling.SupportedResourceTypes)
if err != nil {
return errors.WithMessage(err, "Error with the .scheduling.supportedResourceTypes field in config")
}
ctx.Infof("Supported resource types: %s", resourceListFactory.SummaryString())
submitChecker := scheduler.NewSubmitChecker(
30*time.Minute,
config.Scheduling,
pulsarExecutorRepo,
resourceListFactory,
)
services = append(services, func() error {
return submitChecker.Run(ctx)
Expand Down
1 change: 0 additions & 1 deletion internal/armada/submit/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ func (s *Server) ReprioritizeJobs(grpcCtx context.Context, req *api.JobRepriorit
}

err = s.publisher.PublishMessages(ctx, sequence)

if err != nil {
log.WithError(err).Error("failed send to Pulsar")
return nil, status.Error(codes.Internal, "Failed to send message")
Expand Down
1 change: 0 additions & 1 deletion internal/executor/job/processors/preempt_runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (j *RunPreemptedProcessor) reportPodPreempted(run *job.RunState, pod *v1.Po
domain.JobPreemptedAnnotation: time.Now().String(),
string(v1.PodFailed): time.Now().String(),
})

if err != nil {
return fmt.Errorf("failed to annotate pod as preempted - %s", err)
}
Expand Down
2 changes: 0 additions & 2 deletions internal/pulsartest/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func New(params Params, cmdType string) (*App, error) {
Name: producerName,
Topic: params.Pulsar.JobsetEventsTopic,
})

if err != nil {
return nil, errors.Wrapf(err, "error creating pulsar producer %s", producerName)
}
Expand All @@ -47,7 +46,6 @@ func New(params Params, cmdType string) (*App, error) {
Topic: params.Pulsar.JobsetEventsTopic,
StartMessageID: pulsar.EarliestMessageID(),
})

if err != nil {
return nil, errors.Wrapf(err, "error creating pulsar reader")
}
Expand Down
7 changes: 4 additions & 3 deletions internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestGangScheduler(t *testing.T) {
},
"resolution has no impact on jobs of size a multiple of the resolution": {
SchedulingConfig: testfixtures.WithIndexedResourcesConfig(
[]configuration.IndexedResource{
[]configuration.ResourceType{
{Name: "cpu", Resolution: resource.MustParse("16")},
{Name: "memory", Resolution: resource.MustParse("128Mi")},
},
Expand All @@ -233,7 +233,7 @@ func TestGangScheduler(t *testing.T) {
},
"jobs of size not a multiple of the resolution blocks scheduling new jobs": {
SchedulingConfig: testfixtures.WithIndexedResourcesConfig(
[]configuration.IndexedResource{
[]configuration.ResourceType{
{Name: "cpu", Resolution: resource.MustParse("17")},
{Name: "memory", Resolution: resource.MustParse("128Mi")},
},
Expand All @@ -252,7 +252,7 @@ func TestGangScheduler(t *testing.T) {
},
"consider all nodes in the bucket": {
SchedulingConfig: testfixtures.WithIndexedResourcesConfig(
[]configuration.IndexedResource{
[]configuration.ResourceType{
{Name: "cpu", Resolution: resource.MustParse("1")},
{Name: "memory", Resolution: resource.MustParse("1Mi")},
{Name: "gpu", Resolution: resource.MustParse("1")},
Expand Down Expand Up @@ -593,6 +593,7 @@ func TestGangScheduler(t *testing.T) {
tc.SchedulingConfig.IndexedNodeLabels,
tc.SchedulingConfig.WellKnownNodeTypes,
stringinterner.New(1024),
testfixtures.TestResourceListFactory,
)
require.NoError(t, err)
txn := nodeDb.Txn(true)
Expand Down
18 changes: 18 additions & 0 deletions internal/scheduler/internaltypes/quantity_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package internaltypes

import (
"k8s.io/apimachinery/pkg/api/resource"
)

func QuantityToInt64RoundUp(q resource.Quantity, scale resource.Scale) int64 {
return q.ScaledValue(scale)
}

func QuantityToInt64RoundDown(q resource.Quantity, scale resource.Scale) int64 {
result := q.ScaledValue(scale)
q2 := resource.NewScaledQuantity(result, scale)
if q2.Cmp(q) > 0 {
result--
}
return result
}
69 changes: 69 additions & 0 deletions internal/scheduler/internaltypes/quantity_util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package internaltypes

import (
"testing"

"gopkg.in/inf.v0"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
)

type quantityTest struct {
q resource.Quantity
expectedIntRoundDown int64
expectedIntRoundUp int64
}

func TestQuantityToInt64_WithScaleMillis(t *testing.T) {
tests := []quantityTest{
{resource.MustParse("0"), 0, 0},
{resource.MustParse("1"), 1000, 1000},
{resource.MustParse("1m"), 1, 1},
{resource.MustParse("50m"), 50, 50},
{resource.MustParse("1Mi"), 1024 * 1024 * 1000, 1024 * 1024 * 1000},
{resource.MustParse("1e3"), 1000 * 1000, 1000 * 1000},
{*resource.NewMilliQuantity(1, resource.DecimalExponent), 1, 1},
{*resource.NewDecimalQuantity(*inf.NewDec(1, inf.Scale(0)), resource.DecimalExponent), 1000, 1000},
{resource.MustParse("1n"), 0, 1},
{resource.MustParse("100n"), 0, 1},
{resource.MustParse("999999999n"), 999, 1000},
{resource.MustParse("1000000000n"), 1000, 1000},
{resource.MustParse("1000000001n"), 1000, 1001},
{resource.MustParse("12.3m"), 12, 13},
{resource.MustParse("0.99m"), 0, 1},
{resource.MustParse("1.001m"), 1, 2},
}

for _, test := range tests {
assert.Equal(t, test.expectedIntRoundDown, QuantityToInt64RoundDown(test.q, resource.Milli), test.q)
assert.Equal(t, test.expectedIntRoundUp, QuantityToInt64RoundUp(test.q, resource.Milli), test.q)
}
}

func TestQuantityToInt64_WithUnitScale(t *testing.T) {
const tebi = 1024 * 1024 * 1024 * 1024
tests := []quantityTest{
{resource.MustParse("0"), 0, 0},
{resource.MustParse("1"), 1, 1},
{resource.MustParse("1m"), 0, 1},
{resource.MustParse("50m"), 0, 1},
{resource.MustParse("1Mi"), 1024 * 1024, 1024 * 1024},
{resource.MustParse("1.5Mi"), 1536 * 1024, 1536 * 1024},
{resource.MustParse("4Ti"), 4 * tebi, 4 * tebi},
{resource.MustParse("100000Ti"), 100000 * tebi, 100000 * tebi},
{resource.MustParse("1e3"), 1000, 1000},
{*resource.NewMilliQuantity(1, resource.DecimalExponent), 0, 1},
{*resource.NewDecimalQuantity(*inf.NewDec(1, inf.Scale(0)), resource.DecimalExponent), 1, 1},
{resource.MustParse("1n"), 0, 1},
{resource.MustParse("100n"), 0, 1},
{resource.MustParse("999999999n"), 0, 1},
{resource.MustParse("1000000000n"), 1, 1},
{resource.MustParse("1000000001n"), 1, 2},
}

for _, test := range tests {
assert.Equal(t, test.expectedIntRoundDown, QuantityToInt64RoundDown(test.q, resource.Scale(0)), test.q)
assert.Equal(t, test.expectedIntRoundUp, QuantityToInt64RoundUp(test.q, resource.Scale(0)), test.q)
}
}
16 changes: 16 additions & 0 deletions internal/scheduler/internaltypes/resource_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package internaltypes

import "fmt"

type ResourceList struct {
resources []int64
factory *ResourceListFactory
}

func (rl *ResourceList) GetByName(name string) (int64, error) {
index, ok := rl.factory.nameToIndex[name]
if !ok {
return 0, fmt.Errorf("resource type %s not found", name)
}
return rl.resources[index], nil
}
93 changes: 93 additions & 0 deletions internal/scheduler/internaltypes/resource_list_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package internaltypes

import (
"fmt"
"math"

"github.com/pkg/errors"

k8sResource "k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

type ResourceListFactory struct {
nameToIndex map[string]int
indexToName []string
scales []k8sResource.Scale
}

func MakeResourceListFactory(supportedResourceTypes []configuration.ResourceType) (*ResourceListFactory, error) {
if len(supportedResourceTypes) == 0 {
return nil, errors.New("no resource types configured")
}
indexToName := make([]string, len(supportedResourceTypes))
nameToIndex := make(map[string]int, len(supportedResourceTypes))
scales := make([]k8sResource.Scale, len(supportedResourceTypes))
for i, t := range supportedResourceTypes {
if dup, exists := nameToIndex[t.Name]; exists {
return nil, fmt.Errorf("duplicate resource type name %q", dup)
}
nameToIndex[t.Name] = i
indexToName[i] = t.Name
scales[i] = resolutionToScale(t.Resolution)
}
return &ResourceListFactory{
indexToName: indexToName,
nameToIndex: nameToIndex,
scales: scales,
}, nil
}

// Convert resolution to a k8sResource.Scale
// e.g.
// 1 -> 0
// 0.001 -> -3
// 1000 -> 3
func resolutionToScale(resolution k8sResource.Quantity) k8sResource.Scale {
if resolution.Sign() < 1 {
return k8sResource.Milli
}
return k8sResource.Scale(math.Floor(math.Log10(resolution.AsApproximateFloat64())))
}

// Ignore unknown resources, round down.
func (factory *ResourceListFactory) FromNodeProto(resources schedulerobjects.ResourceList) ResourceList {
result := make([]int64, len(factory.indexToName))
for k, v := range resources.Resources {
index, ok := factory.nameToIndex[k]
if ok {
result[index] = QuantityToInt64RoundDown(v, factory.scales[index])
}
}
return ResourceList{resources: result, factory: factory}
}

// Fail on unknown resources, round up.
func (factory *ResourceListFactory) FromJobProto(resources schedulerobjects.ResourceList) (ResourceList, error) {
result := make([]int64, len(factory.indexToName))
for k, v := range resources.Resources {
index, ok := factory.nameToIndex[k]
if ok {
result[index] = QuantityToInt64RoundUp(v, factory.scales[index])
} else {
return ResourceList{}, fmt.Errorf("unknown resource type %q", k)
}
}
return ResourceList{resources: result, factory: factory}, nil
}

func (factory *ResourceListFactory) SummaryString() string {
result := ""
for i, name := range factory.indexToName {
if i > 0 {
result += " "
}
scale := factory.scales[i]
resolution := k8sResource.NewScaledQuantity(1, scale)
maxValue := k8sResource.NewScaledQuantity(math.MaxInt64, scale)
result += fmt.Sprintf("%s (scale %v, resolution %v, maxValue %f)", name, scale, resolution, maxValue.AsApproximateFloat64())
}
return result
}
Loading

0 comments on commit 7f2085d

Please sign in to comment.