Skip to content

Commit

Permalink
Scheduler: move more to internaltypes.ResourceList (#4036)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdavidsmith authored Nov 8, 2024
1 parent 4416515 commit 43f8573
Show file tree
Hide file tree
Showing 32 changed files with 865 additions and 683 deletions.
36 changes: 14 additions & 22 deletions internal/scheduler/floatingresources/floating_resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ import (

"github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

type FloatingResourceTypes struct {
zeroFloatingResources schedulerobjects.ResourceList
pools map[string]*floatingResourcePool
rlFactory *internaltypes.ResourceListFactory
}

type floatingResourcePool struct {
totalResources schedulerobjects.ResourceList
}

func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig) (*FloatingResourceTypes, error) {
func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) {
zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))}
for _, c := range config {
if _, exists := zeroFloatingResources.Resources[c.Name]; exists {
Expand Down Expand Up @@ -51,24 +53,21 @@ func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig) (*F
return &FloatingResourceTypes{
zeroFloatingResources: zeroFloatingResources,
pools: pools,
rlFactory: rlFactory,
}, nil
}

func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated schedulerobjects.ResourceList) (bool, string) {
pool, exists := frt.pools[poolName]
if !exists {
func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated internaltypes.ResourceList) (bool, string) {
available := frt.GetTotalAvailableForPoolInternalTypes(poolName)
if available.AllZero() {
return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName)
}
rl := pool.totalResources.DeepCopy()
rl.Sub(allocated)
for resourceName, quantity := range rl.Resources {
if !frt.isFloatingResource(resourceName) {
continue
}
if quantity.Cmp(resource.Quantity{}) == -1 {
return false, fmt.Sprintf("not enough floating resource %s in pool %s", resourceName, poolName)
}

resourceName, _, _, exceeds := allocated.OfType(internaltypes.Floating).ExceedsAvailable(available)
if exceeds {
return false, fmt.Sprintf("not enough floating resource %s in pool %s", resourceName, poolName)
}

return true, ""
}

Expand All @@ -86,10 +85,8 @@ func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) sche
return pool.totalResources.DeepCopy()
}

func (frt *FloatingResourceTypes) AddTotalAvailableForPool(poolName string, kubernetesResources schedulerobjects.ResourceList) schedulerobjects.ResourceList {
floatingResources := frt.GetTotalAvailableForPool(poolName) // Note GetTotalAvailableForPool returns a deep copy
floatingResources.Add(kubernetesResources)
return floatingResources
func (frt *FloatingResourceTypes) GetTotalAvailableForPoolInternalTypes(poolName string) internaltypes.ResourceList {
return frt.rlFactory.FromNodeProto(frt.GetTotalAvailableForPool(poolName).Resources)
}

func (frt *FloatingResourceTypes) SummaryString() string {
Expand All @@ -98,8 +95,3 @@ func (frt *FloatingResourceTypes) SummaryString() string {
}
return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ")
}

func (frt *FloatingResourceTypes) isFloatingResource(resourceName string) bool {
_, exists := frt.zeroFloatingResources.Resources[resourceName]
return exists
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,82 +7,93 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
)

func TestAllPools(t *testing.T) {
sut := makeSut(t)
sut := makeSut(t, makeRlFactory())
assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools())
}

func TestGetTotalAvailableForPool(t *testing.T) {
sut := makeSut(t)
sut := makeSut(t, makeRlFactory())
zero := resource.Quantity{}
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources)
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources)
assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources)
}

func TestAddTotalAvailableForPool(t *testing.T) {
sut := makeSut(t)
zero := resource.Quantity{}
ten := *resource.NewQuantity(10, resource.DecimalSI)
kubernetesResources := schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": ten}}
assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.AddTotalAvailableForPool("cpu", kubernetesResources).Resources)
assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.AddTotalAvailableForPool("gpu", kubernetesResources).Resources)
assert.Equal(t, map[string]resource.Quantity{"cpu": ten, "floating-resource-1": zero, "floating-resource-2": zero}, sut.AddTotalAvailableForPool("some-other-pool", kubernetesResources).Resources)
assert.Equal(t, map[string]resource.Quantity{"cpu": ten}, kubernetesResources.Resources) // check hasn't mutated arg
func TestGetTotalAvailableForPoolInternalTypes(t *testing.T) {
sut := makeSut(t, makeRlFactory())

cpuPool := sut.GetTotalAvailableForPoolInternalTypes("cpu")
assert.Equal(t, int64(200000), cpuPool.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(300000), cpuPool.GetByNameZeroIfMissing("floating-resource-2"))

gpuPool := sut.GetTotalAvailableForPoolInternalTypes("gpu")
assert.Equal(t, int64(100000), gpuPool.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(0), gpuPool.GetByNameZeroIfMissing("floating-resource-2"))

notFound := sut.GetTotalAvailableForPoolInternalTypes("some-invalid-value")
assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-1"))
assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-2"))
}

func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("cpu",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("199")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("199")}),
)
assert.True(t, withinLimits)
assert.Empty(t, errorMessage)
}

func TestWithinLimits_WhenAtLimit_ReturnsTrue(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("cpu",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200")}),
)
assert.True(t, withinLimits)
assert.Empty(t, errorMessage)
}

func TestWithinLimits_WhenExceedsLimit_ReturnsFalse(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("cpu",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("201")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("201")}),
)
assert.False(t, withinLimits)
assert.NotEmpty(t, errorMessage)
}

func TestWithinLimits_IgnoresNonFloatingResources(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("cpu",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"some-other-resource": resource.MustParse("1000")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"cpu": resource.MustParse("1000")}),
)
assert.True(t, withinLimits)
assert.Empty(t, errorMessage)
}

func TestWithinLimits_WhenResourceNotSpecifiedForAPool_ReturnsFalse(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("gpu",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-2": resource.MustParse("1")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-2": resource.MustParse("1")}),
)
assert.False(t, withinLimits)
assert.NotEmpty(t, errorMessage)
}

func TestWithinLimits_WhenPoolDoesNotExist_ReturnsFalse(t *testing.T) {
sut := makeSut(t)
rlFactory := makeRlFactory()
sut := makeSut(t, rlFactory)
withinLimits, errorMessage := sut.WithinLimits("some-other-pool",
schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("1")}},
rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("1")}),
)
assert.False(t, withinLimits)
assert.NotEmpty(t, errorMessage)
Expand Down Expand Up @@ -115,8 +126,18 @@ func testConfig() []configuration.FloatingResourceConfig {
}
}

func makeSut(t *testing.T) *FloatingResourceTypes {
sut, err := NewFloatingResourceTypes(testConfig())
func makeRlFactory() *internaltypes.ResourceListFactory {
rlFactory, err := internaltypes.NewResourceListFactory([]configuration.ResourceType{
{Name: "cpu"},
}, testConfig())
if err != nil {
panic(err)
}
return rlFactory
}

func makeSut(t *testing.T, rlFactory *internaltypes.ResourceListFactory) *FloatingResourceTypes {
sut, err := NewFloatingResourceTypes(testConfig(), rlFactory)
assert.Nil(t, err)
return sut
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func TestMax(t *testing.T) {
factory := testFactory()
assert.Equal(t, 0.0, testResourceFractionList(factory, -0.1, 0.0, 0.0).Max())
assert.Equal(t, 0.0, testResourceFractionList(factory, 0.0, 0.0, 0.0).Max())
assert.Equal(t, 0.9, testResourceFractionList(factory, 0.1, 0.9, 0.7).Max())
assert.Equal(t, 0.9, testResourceFractionList(factory, 0.2, 0.9, 0.1).Max())
assert.Equal(t, 0.9, testResourceFractionList(factory, 0.9, 0.2, 0.1).Max())
}

func TestMax_HandlesEmptyCorrectly(t *testing.T) {
Expand Down
64 changes: 52 additions & 12 deletions internal/scheduler/internaltypes/resource_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internaltypes

import (
"fmt"
"math"

"golang.org/x/exp/slices"
k8sResource "k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -76,6 +77,19 @@ func (rl ResourceList) GetByNameZeroIfMissing(name string) int64 {
return rl.resources[index]
}

func (rl ResourceList) GetResourceByNameZeroIfMissing(name string) k8sResource.Quantity {
if rl.IsEmpty() {
return k8sResource.Quantity{}
}

index, ok := rl.factory.nameToIndex[name]
if !ok {
return k8sResource.Quantity{}
}

return *k8sResource.NewScaledQuantity(rl.resources[index], rl.factory.scales[index])
}

func (rl ResourceList) GetResources() []Resource {
if rl.IsEmpty() {
return []Resource{}
Expand Down Expand Up @@ -147,6 +161,15 @@ func (rl ResourceList) IsEmpty() bool {
return rl.factory == nil
}

func (rl ResourceList) Factory() *ResourceListFactory {
return rl.factory
}

func (rl ResourceList) Exceeds(other ResourceList) bool {
_, _, _, exceeds := rl.ExceedsAvailable(other)
return exceeds
}

// ExceedsAvailable
// - if any resource in this ResourceList is greater than the equivalent resource in param available, this function returns
// - the name of the relevant resource
Expand Down Expand Up @@ -195,6 +218,21 @@ func (rl ResourceList) OfType(t ResourceType) ResourceList {
return ResourceList{factory: rl.factory, resources: result}
}

func (rl ResourceList) Cap(cap ResourceList) ResourceList {
assertSameResourceListFactory(rl.factory, cap.factory)
if rl.IsEmpty() {
return ResourceList{}
}
if cap.IsEmpty() {
return rl
}
result := make([]int64, len(rl.resources))
for i, r := range rl.resources {
result[i] = min(r, cap.resources[i])
}
return ResourceList{factory: rl.factory, resources: result}
}

func (rl ResourceList) Add(other ResourceList) ResourceList {
assertSameResourceListFactory(rl.factory, other.factory)
if rl.IsEmpty() {
Expand Down Expand Up @@ -266,17 +304,6 @@ func (rl ResourceList) Negate() ResourceList {
return ResourceList{factory: rl.factory, resources: result}
}

func (rl ResourceList) Scale(factor float64) ResourceList {
if rl.IsEmpty() {
return rl
}
result := make([]int64, len(rl.resources))
for i, r := range rl.resources {
result[i] = multiplyResource(r, factor)
}
return ResourceList{resources: result, factory: rl.factory}
}

func (rl ResourceList) asQuantity(index int) *k8sResource.Quantity {
if rl.factory == nil {
return &k8sResource.Quantity{}
Expand All @@ -299,8 +326,21 @@ func assertSameResourceListFactory(a, b *ResourceListFactory) {

func multiplyResource(res int64, multiplier float64) int64 {
if multiplier == 1.0 {
// avoid rounding error in the simple case
// Avoid rounding error in the simple case.
return res
}

// Return max int64 if multiplier is infinity.
// If res is zero, we assume infinity trumps zero, and return int64 maxValue.
// This gives the right behavior when the result is used as a cap,
// as an infinity multiplier means "never apply cap".
if math.IsInf(multiplier, 0) {
if (multiplier < 0) == (res < 0) {
return math.MaxInt64
} else {
return math.MinInt64
}
}

return int64(float64(res) * multiplier)
}
59 changes: 59 additions & 0 deletions internal/scheduler/internaltypes/resource_list_map_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package internaltypes

import (
"strings"

"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

func RlMapToString(m map[string]ResourceList) string {
results := []string{}
for k, v := range m {
results = append(results, k+"="+v.String())
}
return strings.Join(results, " ")
}

func RlMapSumValues(m map[string]ResourceList) ResourceList {
result := ResourceList{}
for _, v := range m {
result = result.Add(v)
}
return result
}

func RlMapAllZero(m map[string]ResourceList) bool {
for _, v := range m {
if !v.AllZero() {
return false
}
}
return true
}

func RlMapHasNegativeValues(m map[string]ResourceList) bool {
for _, v := range m {
if v.HasNegativeValues() {
return true
}
}
return false
}

func RlMapFromJobSchedulerObjects(m schedulerobjects.QuantityByTAndResourceType[string], rlFactory *ResourceListFactory) map[string]ResourceList {
result := map[string]ResourceList{}
for k, v := range m {
result[k] = rlFactory.FromJobResourceListIgnoreUnknown(v.Resources)
}
return result
}

func RlMapRemoveZeros(m map[string]ResourceList) map[string]ResourceList {
result := map[string]ResourceList{}
for k, v := range m {
if !v.AllZero() {
result[k] = v
}
}
return result
}
Loading

0 comments on commit 43f8573

Please sign in to comment.