diff --git a/internal/scheduler/nodedb/encoding_test.go b/internal/scheduler/nodedb/encoding_test.go index 3289a02d4bf..c6e3a53e7c9 100644 --- a/internal/scheduler/nodedb/encoding_test.go +++ b/internal/scheduler/nodedb/encoding_test.go @@ -138,12 +138,80 @@ func TestRoundQuantityToResolution(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { + qc := tc.q.DeepCopy() actual := roundQuantityToResolution(tc.q, tc.resolutionMillis) + assert.True(t, qc.Equal(tc.q)) assert.Truef(t, actual.Equal(tc.expected), "expected %s, but got %s", tc.expected.String(), actual.String()) + + qDec := tc.q.DeepCopy() + qDec.ToDec() + qDecCopy := qDec.DeepCopy() + actualDec := roundQuantityToResolution(qDec, tc.resolutionMillis) + assert.True(t, qDecCopy.Equal(qDec)) + assert.Truef(t, actualDec.Equal(tc.expected), "expected %s, but got %s", tc.expected.String(), actual.String()) }) } } +func TestNodeIndexKeyComparison(t *testing.T) { + v1 := resource.MustParse("1") + actualRoundedKey := RoundedNodeIndexKeyFromResourceList( + nil, + 0, + []string{ + "cpu", + "memory", + "nvidia.com/gpu", + "nvidia.com/mig-1g.10gb", + "nvidia.com/mig-1g.20gb", + "nvidia.com/mig-1g.40gb", + }, + []int64{ + v1.MilliValue(), + v1.MilliValue(), + v1.MilliValue(), + v1.MilliValue(), + v1.MilliValue(), + v1.MilliValue(), + }, + schedulerobjects.ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": *resource.NewScaledQuantity(999958006, -9), + "memory": *resource.NewScaledQuantity(11823681536, 0), + "nvidia.com/gpu": *resource.NewScaledQuantity(0, 0), + "nvidia.com/mig-1g.10gb": *resource.NewScaledQuantity(0, 0), + "nvidia.com/mig-1g.20gb": *resource.NewScaledQuantity(0, 0), + "nvidia.com/mig-1g.40gb": *resource.NewScaledQuantity(0, 0), + }, + }, + 0, + ) + actualKey := NodeIndexKey( + nil, + 0, + []resource.Quantity{ + *resource.NewScaledQuantity(999958006, -9), + *resource.NewScaledQuantity(11823681536, 0), + *resource.NewScaledQuantity(0, 0), + *resource.NewScaledQuantity(0, 0), + *resource.NewScaledQuantity(0, 0), + *resource.NewScaledQuantity(0, 0), + }, + ) + expected := []byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nodeTypeId + 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xe8, // cpu + 0x80, 0x00, 0x0a, 0xc0, 0xea, 0x56, 0x80, 0x00, // memory + 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nvidia.com.gpu + 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nvidia.com/mig-1g.10gb + 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nvidia.com/mig-1g.20gb + 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nvidia.com/mig-1g.40gb + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // nodeIndex + } + assert.Equal(t, expected, actualRoundedKey) + assert.Equal(t, expected, actualKey) +} + func TestNodeIndexKey(t *testing.T) { type nodeIndexKeyValues struct { nodeTypeId uint64 diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index 3c30bf21e53..4d737331909 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -154,7 +154,7 @@ func (nodeDb *NodeDb) create(node *schedulerobjects.Node) (*Node, error) { nodeDb.nodeTypes[nodeType.Id] = nodeType nodeDb.mu.Unlock() - entry := &Node{ + return &Node{ Id: node.Id, Index: index, @@ -174,8 +174,7 @@ func (nodeDb *NodeDb) create(node *schedulerobjects.Node) (*Node, error) { AllocatedByQueue: allocatedByQueue, AllocatedByJobId: allocatedByJobId, EvictedJobRunIds: evictedJobRunIds, - } - return entry, nil + }, nil } func (nodeDb *NodeDb) CreateAndInsertWithApiJobsWithTxn(txn *memdb.Txn, jobs []*api.Job, node *schedulerobjects.Node) error { diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 02cf2b87f1f..4fda0e6814e 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -27,6 +27,79 @@ func TestNodeDbSchema(t *testing.T) { assert.NoError(t, schema.Validate()) } +func TestNodeUnsafeCopy(t *testing.T) { + node := &Node{ + Id: "id", + Index: 1, + Executor: "executor", + Name: "name", + Taints: []v1.Taint{ + { + Key: "foo", + Value: "bar", + }, + }, + Labels: map[string]string{ + "key": "value", + }, + TotalResources: schedulerobjects.ResourceList{ + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("16"), + "memory": resource.MustParse("32Gi"), + }, + }, + Keys: [][]byte{ + { + 0, 1, 255, + }, + }, + NodeTypeId: 123, + AllocatableByPriority: schedulerobjects.AllocatableByPriorityAndResourceType{ + 1: { + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("0"), + "memory": resource.MustParse("0Gi"), + }, + }, + 2: { + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("8"), + "memory": resource.MustParse("16Gi"), + }, + }, + 3: { + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("16"), + "memory": resource.MustParse("32Gi"), + }, + }, + }, + AllocatedByQueue: map[string]schedulerobjects.ResourceList{ + "queue": { + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("8"), + "memory": resource.MustParse("16Gi"), + }, + }, + }, + AllocatedByJobId: map[string]schedulerobjects.ResourceList{ + "jobId": { + Resources: map[string]resource.Quantity{ + "cpu": resource.MustParse("8"), + "memory": resource.MustParse("16Gi"), + }, + }, + }, + EvictedJobRunIds: map[string]bool{ + "jobId": false, + "evictedJobId": true, + }, + } + nodeCopy := node.UnsafeCopy() + // TODO(albin): Add more tests here. + assert.Equal(t, node.Id, nodeCopy.Id) +} + // Test the accounting of total resources across all nodes. func TestTotalResources(t *testing.T) { nodeDb, err := newNodeDbWithNodes([]*schedulerobjects.Node{}) diff --git a/internal/scheduler/nodedb/nodeiteration.go b/internal/scheduler/nodedb/nodeiteration.go index 4a5f9d2b8d5..304dc27a28e 100644 --- a/internal/scheduler/nodedb/nodeiteration.go +++ b/internal/scheduler/nodedb/nodeiteration.go @@ -6,6 +6,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" "golang.org/x/exp/slices" "k8s.io/apimachinery/pkg/api/resource" ) @@ -308,9 +309,13 @@ type NodeTypeIterator struct { // Current lower bound on node allocatable resources looked for. // Updated in-place as the iterator makes progress. lowerBound []resource.Quantity + // Tentative lower-bound. + newLowerBound []resource.Quantity // memdb key computed from nodeTypeId and lowerBound. // Stored here to avoid dynamic allocs. key []byte + // Key for newLowerBound. + newKey []byte // Current iterator into the underlying memdb. // Updated in-place whenever lowerBound changes. memdbIterator memdb.ResultIterator @@ -348,6 +353,7 @@ func NewNodeTypeIterator( indexedResourceRequests: indexedResourceRequests, indexedResourceResolutionMillis: indexedResourceResolutionMillis, lowerBound: slices.Clone(indexedResourceRequests), + newLowerBound: slices.Clone(indexedResourceRequests), } memdbIt, err := it.newNodeTypeIterator() if err != nil { @@ -358,6 +364,7 @@ func NewNodeTypeIterator( } func (it *NodeTypeIterator) newNodeTypeIterator() (memdb.ResultIterator, error) { + // TODO(albin): We're re-computing the key unnecessarily here. it.key = NodeIndexKey(it.key[0:0], it.nodeTypeId, it.lowerBound) memdbIt, err := it.txn.LowerBound( "nodes", @@ -410,16 +417,31 @@ func (it *NodeTypeIterator) NextNode() (*Node, error) { return nil, errors.Errorf("node %s has no resources registered at priority %d: %v", node.Id, it.priority, node.AllocatableByPriority) } for i, t := range it.indexedResources { - nodeQuantity := allocatableByPriority.Get(t) - requestQuantity := it.indexedResourceRequests[i] - it.lowerBound[i] = roundQuantityToResolution(nodeQuantity, it.indexedResourceResolutionMillis[i]) + nodeQuantity := allocatableByPriority.Get(t).DeepCopy() + requestQuantity := it.indexedResourceRequests[i].DeepCopy() + it.newLowerBound[i] = roundQuantityToResolution(nodeQuantity, it.indexedResourceResolutionMillis[i]) // If nodeQuantity < requestQuantity, replace the iterator using the lowerBound. // If nodeQuantity >= requestQuantity for all resources, return the node. if nodeQuantity.Cmp(requestQuantity) == -1 { for j := i; j < len(it.indexedResources); j++ { - it.lowerBound[j] = it.indexedResourceRequests[j] + it.newLowerBound[j] = it.indexedResourceRequests[j] } + + it.newKey = NodeIndexKey(it.newKey[0:0], it.nodeTypeId, it.newLowerBound) + if bytes.Compare(it.key, it.newKey) == -1 { + // TODO(albin): Temporary workaround. Shouldn't be necessary. + lowerBound := it.lowerBound + it.lowerBound = it.newLowerBound + it.newLowerBound = lowerBound + } else { + log.Warnf( + "new lower-bound %x is not greater than current bound %x", + it.newKey, it.key, + ) + break + } + memdbIterator, err := it.newNodeTypeIterator() if err != nil { return nil, err