Skip to content

Commit

Permalink
Workaround to avoid node iteration cycles (#3436)
Browse files Browse the repository at this point in the history
* Workaround to avoid node iteration loops

* Fix printout
  • Loading branch information
severinson committed Mar 1, 2024
1 parent 18c0b68 commit ed0fd46
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 7 deletions.
68 changes: 68 additions & 0 deletions internal/scheduler/nodedb/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,80 @@ func TestRoundQuantityToResolution(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
qc := tc.q.DeepCopy()
actual := roundQuantityToResolution(tc.q, tc.resolutionMillis)
assert.True(t, qc.Equal(tc.q))
assert.Truef(t, actual.Equal(tc.expected), "expected %s, but got %s", tc.expected.String(), actual.String())

qDec := tc.q.DeepCopy()
qDec.ToDec()
qDecCopy := qDec.DeepCopy()
actualDec := roundQuantityToResolution(qDec, tc.resolutionMillis)
assert.True(t, qDecCopy.Equal(qDec))
assert.Truef(t, actualDec.Equal(tc.expected), "expected %s, but got %s", tc.expected.String(), actual.String())
})
}
}

func TestNodeIndexKeyComparison(t *testing.T) {
v1 := resource.MustParse("1")
actualRoundedKey := RoundedNodeIndexKeyFromResourceList(
nil,
0,
[]string{
"cpu",
"memory",
"nvidia.com/gpu",
"nvidia.com/mig-1g.10gb",
"nvidia.com/mig-1g.20gb",
"nvidia.com/mig-1g.40gb",
},
[]int64{
v1.MilliValue(),
v1.MilliValue(),
v1.MilliValue(),
v1.MilliValue(),
v1.MilliValue(),
v1.MilliValue(),
},
schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{
"cpu": *resource.NewScaledQuantity(999958006, -9),
"memory": *resource.NewScaledQuantity(11823681536, 0),
"nvidia.com/gpu": *resource.NewScaledQuantity(0, 0),
"nvidia.com/mig-1g.10gb": *resource.NewScaledQuantity(0, 0),
"nvidia.com/mig-1g.20gb": *resource.NewScaledQuantity(0, 0),
"nvidia.com/mig-1g.40gb": *resource.NewScaledQuantity(0, 0),
},
},
0,
)
actualKey := NodeIndexKey(
nil,
0,
[]resource.Quantity{
*resource.NewScaledQuantity(999958006, -9),
*resource.NewScaledQuantity(11823681536, 0),
*resource.NewScaledQuantity(0, 0),
*resource.NewScaledQuantity(0, 0),
*resource.NewScaledQuantity(0, 0),
*resource.NewScaledQuantity(0, 0),
},
)
expected := []byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nodeTypeId
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xe8, // cpu
0x80, 0x00, 0x0a, 0xc0, 0xea, 0x56, 0x80, 0x00, // memory
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nvidia.com.gpu
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nvidia.com/mig-1g.10gb
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nvidia.com/mig-1g.20gb
0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nvidia.com/mig-1g.40gb
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nodeIndex
}
assert.Equal(t, expected, actualRoundedKey)
assert.Equal(t, expected, actualKey)
}

func TestNodeIndexKey(t *testing.T) {
type nodeIndexKeyValues struct {
nodeTypeId uint64
Expand Down
5 changes: 2 additions & 3 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (nodeDb *NodeDb) create(node *schedulerobjects.Node) (*Node, error) {
nodeDb.nodeTypes[nodeType.Id] = nodeType
nodeDb.mu.Unlock()

entry := &Node{
return &Node{
Id: node.Id,
Index: index,

Expand All @@ -174,8 +174,7 @@ func (nodeDb *NodeDb) create(node *schedulerobjects.Node) (*Node, error) {
AllocatedByQueue: allocatedByQueue,
AllocatedByJobId: allocatedByJobId,
EvictedJobRunIds: evictedJobRunIds,
}
return entry, nil
}, nil
}

func (nodeDb *NodeDb) CreateAndInsertWithApiJobsWithTxn(txn *memdb.Txn, jobs []*api.Job, node *schedulerobjects.Node) error {
Expand Down
73 changes: 73 additions & 0 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,79 @@ func TestNodeDbSchema(t *testing.T) {
assert.NoError(t, schema.Validate())
}

func TestNodeUnsafeCopy(t *testing.T) {
node := &Node{
Id: "id",
Index: 1,
Executor: "executor",
Name: "name",
Taints: []v1.Taint{
{
Key: "foo",
Value: "bar",
},
},
Labels: map[string]string{
"key": "value",
},
TotalResources: schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{
"cpu": resource.MustParse("16"),
"memory": resource.MustParse("32Gi"),
},
},
Keys: [][]byte{
{
0, 1, 255,
},
},
NodeTypeId: 123,
AllocatableByPriority: schedulerobjects.AllocatableByPriorityAndResourceType{
1: {
Resources: map[string]resource.Quantity{
"cpu": resource.MustParse("0"),
"memory": resource.MustParse("0Gi"),
},
},
2: {
Resources: map[string]resource.Quantity{
"cpu": resource.MustParse("8"),
"memory": resource.MustParse("16Gi"),
},
},
3: {
Resources: map[string]resource.Quantity{
"cpu": resource.MustParse("16"),
"memory": resource.MustParse("32Gi"),
},
},
},
AllocatedByQueue: map[string]schedulerobjects.ResourceList{
"queue": {
Resources: map[string]resource.Quantity{
"cpu": resource.MustParse("8"),
"memory": resource.MustParse("16Gi"),
},
},
},
AllocatedByJobId: map[string]schedulerobjects.ResourceList{
"jobId": {
Resources: map[string]resource.Quantity{
"cpu": resource.MustParse("8"),
"memory": resource.MustParse("16Gi"),
},
},
},
EvictedJobRunIds: map[string]bool{
"jobId": false,
"evictedJobId": true,
},
}
nodeCopy := node.UnsafeCopy()
// TODO(albin): Add more tests here.
assert.Equal(t, node.Id, nodeCopy.Id)
}

// Test the accounting of total resources across all nodes.
func TestTotalResources(t *testing.T) {
nodeDb, err := newNodeDbWithNodes([]*schedulerobjects.Node{})
Expand Down
30 changes: 26 additions & 4 deletions internal/scheduler/nodedb/nodeiteration.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/hashicorp/go-memdb"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
"k8s.io/apimachinery/pkg/api/resource"
)
Expand Down Expand Up @@ -308,9 +309,13 @@ type NodeTypeIterator struct {
// Current lower bound on node allocatable resources looked for.
// Updated in-place as the iterator makes progress.
lowerBound []resource.Quantity
// Tentative lower-bound.
newLowerBound []resource.Quantity
// memdb key computed from nodeTypeId and lowerBound.
// Stored here to avoid dynamic allocs.
key []byte
// Key for newLowerBound.
newKey []byte
// Current iterator into the underlying memdb.
// Updated in-place whenever lowerBound changes.
memdbIterator memdb.ResultIterator
Expand Down Expand Up @@ -348,6 +353,7 @@ func NewNodeTypeIterator(
indexedResourceRequests: indexedResourceRequests,
indexedResourceResolutionMillis: indexedResourceResolutionMillis,
lowerBound: slices.Clone(indexedResourceRequests),
newLowerBound: slices.Clone(indexedResourceRequests),
}
memdbIt, err := it.newNodeTypeIterator()
if err != nil {
Expand All @@ -358,6 +364,7 @@ func NewNodeTypeIterator(
}

func (it *NodeTypeIterator) newNodeTypeIterator() (memdb.ResultIterator, error) {
// TODO(albin): We're re-computing the key unnecessarily here.
it.key = NodeIndexKey(it.key[0:0], it.nodeTypeId, it.lowerBound)
memdbIt, err := it.txn.LowerBound(
"nodes",
Expand Down Expand Up @@ -410,16 +417,31 @@ func (it *NodeTypeIterator) NextNode() (*Node, error) {
return nil, errors.Errorf("node %s has no resources registered at priority %d: %v", node.Id, it.priority, node.AllocatableByPriority)
}
for i, t := range it.indexedResources {
nodeQuantity := allocatableByPriority.Get(t)
requestQuantity := it.indexedResourceRequests[i]
it.lowerBound[i] = roundQuantityToResolution(nodeQuantity, it.indexedResourceResolutionMillis[i])
nodeQuantity := allocatableByPriority.Get(t).DeepCopy()
requestQuantity := it.indexedResourceRequests[i].DeepCopy()
it.newLowerBound[i] = roundQuantityToResolution(nodeQuantity, it.indexedResourceResolutionMillis[i])

// If nodeQuantity < requestQuantity, replace the iterator using the lowerBound.
// If nodeQuantity >= requestQuantity for all resources, return the node.
if nodeQuantity.Cmp(requestQuantity) == -1 {
for j := i; j < len(it.indexedResources); j++ {
it.lowerBound[j] = it.indexedResourceRequests[j]
it.newLowerBound[j] = it.indexedResourceRequests[j]
}

it.newKey = NodeIndexKey(it.newKey[0:0], it.nodeTypeId, it.newLowerBound)
if bytes.Compare(it.key, it.newKey) == -1 {
// TODO(albin): Temporary workaround. Shouldn't be necessary.
lowerBound := it.lowerBound
it.lowerBound = it.newLowerBound
it.newLowerBound = lowerBound
} else {
log.Warnf(
"new lower-bound %x is not greater than current bound %x",
it.newKey, it.key,
)
break
}

memdbIterator, err := it.newNodeTypeIterator()
if err != nil {
return nil, err
Expand Down

0 comments on commit ed0fd46

Please sign in to comment.