Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clusteragent/autoscaling] Impose 100 DatadogPodAutoscaler limit in cluster agent #28684

Merged
merged 33 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8ee3911
Add creation timestamp to DatadogPodAutoscalerInternal
jennchenn Aug 20, 2024
6cd039c
Fix minor typos
jennchenn Aug 21, 2024
46ece6f
Implement heap to track number of DPA objects
jennchenn Aug 22, 2024
cabcad1
Update heap names to reduce redundancy in calls
jennchenn Aug 23, 2024
e70833e
Add basic unit test for creation of heap
jennchenn Aug 23, 2024
843d36c
fixup! Update heap names to reduce redundancy in calls
jennchenn Aug 23, 2024
b200155
Make error message shown in status more explicit
jennchenn Aug 30, 2024
c6d6431
Reorder PAI params to have metadata fields first
jennchenn Aug 30, 2024
1863ce1
Prevent adding objects with 0 timestamp to heap
jennchenn Aug 30, 2024
9ecf501
Remove hashheap from generic controller definition and add delete obs…
jennchenn Aug 30, 2024
156ecff
Register observer on store to update heap
jennchenn Sep 6, 2024
90799f2
Merge remote-tracking branch 'origin/main' into jenn/CASCL-1_autoscal…
jennchenn Sep 6, 2024
b680558
Add check for 100 DPAs to validation logic
jennchenn Sep 6, 2024
ea7979d
Refactor max heap functions to use Exists helper
jennchenn Sep 6, 2024
91e6b9f
Update DCA target test with autoscaler creation timestamp
jennchenn Sep 6, 2024
0622fb4
Remove period from error message; remove redundant return
jennchenn Sep 6, 2024
7254318
Update local test to check behavior on delete
jennchenn Sep 25, 2024
03fd859
Add RWMutex to hash heap
jennchenn Sep 25, 2024
b29cfd0
Remove test namespace hardcoded value
jennchenn Sep 26, 2024
565a3ef
Update creation timestamp for remote owner
jennchenn Sep 26, 2024
b92ac07
Rename controller hashHeap to limitHeap for better description of usage
jennchenn Sep 27, 2024
3913a97
Remove redundant variable initialization
jennchenn Sep 27, 2024
6669594
Rename test to be more descriptive
jennchenn Sep 27, 2024
44589e5
Update creation timestamp only when spec changes
jennchenn Sep 27, 2024
4ae0ab3
Simplify validation logic and pass only one param
jennchenn Sep 27, 2024
3285d31
Skip rest of DCA validation logic if self pod name cannot be found
jennchenn Sep 27, 2024
aedc9f8
fixup! Rename controller hashHeap to limitHeap for better description…
jennchenn Sep 27, 2024
0429380
Use store directly in hash heap to avoid passing obj to observer
jennchenn Sep 27, 2024
653ab74
Test hash heap on remote owner DPA objects
jennchenn Oct 1, 2024
6cdaf91
Merge remote-tracking branch 'origin/main' into jenn/CASCL-1_autoscal…
jennchenn Oct 1, 2024
b9a47f3
Pass in store to hash heap
jennchenn Oct 2, 2024
96ef6dd
Remove duplication and clean up fake pod autoscaler helper function
jennchenn Oct 3, 2024
4a317d5
Remove requeue after creation timestamp update
jennchenn Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions pkg/clusteragent/autoscaling/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ type Controller struct {
context context.Context

// Fields available to child controllers
ID string
Client dynamic.Interface
Lister cache.GenericLister
Workqueue workqueue.RateLimitingInterface
IsLeader func() bool
ID string
Client dynamic.Interface
Lister cache.GenericLister
Workqueue workqueue.RateLimitingInterface
AutoscalerHeap *HashHeap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not something generic to any controller, should not be part of the generic Controller. Generally speaking, it's an internal part of the controller and there's no probably no reason to take is as creation parameter.

IsLeader func() bool
}

// NewController returns a new workload autoscaling controller
Expand All @@ -47,16 +48,18 @@ func NewController(
isLeader func() bool,
observable Observable,
workqueue workqueue.RateLimitingInterface,
autoscalerHeap *HashHeap,
) (*Controller, error) {
mainInformer := informer.ForResource(gvr)
c := &Controller{
processor: processor,
ID: controllerID,
Client: client,
Lister: mainInformer.Lister(),
synced: mainInformer.Informer().HasSynced,
Workqueue: workqueue,
IsLeader: isLeader,
processor: processor,
ID: controllerID,
Client: client,
Lister: mainInformer.Lister(),
synced: mainInformer.Informer().HasSynced,
Workqueue: workqueue,
IsLeader: isLeader,
AutoscalerHeap: autoscalerHeap,
}

if _, err := mainInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
119 changes: 119 additions & 0 deletions pkg/clusteragent/autoscaling/max_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build kubeapiserver

package autoscaling

import (
"container/heap"
"time"
)

// TimestampKey is a struct that holds a timestamp and key for a `DatadogPodAutoscaler` object
type TimestampKey struct {
Timestamp time.Time
Key string
}

// MaxTimestampKeyHeap is a heap that sorts TimestampKey objects by timestamp in descending order
type MaxTimestampKeyHeap []TimestampKey

// Len returns the length of the heap
func (h MaxTimestampKeyHeap) Len() int { return len(h) }

// Less returns true if the timestamp at index i is after the timestamp at index j
func (h MaxTimestampKeyHeap) Less(i, j int) bool { return h[i].Timestamp.After(h[j].Timestamp) }

// Swap swaps the elements at indices i and j
func (h MaxTimestampKeyHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

// Push adds an element to the heap while preserving max heap ordering
func (h *MaxTimestampKeyHeap) Push(x interface{}) {
*h = append(*h, x.(TimestampKey))
}

// Pop removes the top element from the heap while preserving max heap ordering
func (h *MaxTimestampKeyHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// Peek returns the top element of the heap without removing it
func (h *MaxTimestampKeyHeap) Peek() TimestampKey {
return (*h)[0]
}

// NewMaxHeap returns a new MaxTimestampKeyHeap
func NewMaxHeap() *MaxTimestampKeyHeap {
h := &MaxTimestampKeyHeap{}
heap.Init(h)
return h
}

// FindIdx returns the index of the given key in the heap and a boolean indicating if the key was found
func (h *MaxTimestampKeyHeap) FindIdx(key string) (int, bool) {
for idx, k := range *h {
if k.Key == key {
return idx, true
}
}
return 0, false
}

// HashHeap is a struct that holds a MaxHeap and a set of keys that exist in the heap
type HashHeap struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashHeap is currently not thread safe, which could become necessary if we have multiple workers or if we move the store to object locking.

MaxHeap MaxTimestampKeyHeap
Keys map[string]bool
maxSize int
}

// NewHashHeap returns a new MaxHeap with the given max size
func NewHashHeap(maxSize int) *HashHeap {
return &HashHeap{
MaxHeap: *NewMaxHeap(),
Keys: make(map[string]bool),
maxSize: maxSize,
}
}

// InsertIntoHeap returns true if the key already exists in the max heap or was inserted correctly
func (h *HashHeap) InsertIntoHeap(k TimestampKey) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when you passe zero time value? I believe it should not be inserted

// Already in heap, do not try to insert
if _, ok := h.Keys[k.Key]; ok {
return true
}

if h.MaxHeap.Len() >= h.maxSize {
top := h.MaxHeap.Peek()
// If the new key is newer than or equal to the top key, do not insert
if top.Timestamp.Before(k.Timestamp) || top.Timestamp.Equal(k.Timestamp) {
return false
}
delete(h.Keys, top.Key)
heap.Pop(&h.MaxHeap)
}

heap.Push(&h.MaxHeap, k)
h.Keys[k.Key] = true
return true
}

// DeleteFromHeap removes the given key from the max heap
func (h *HashHeap) DeleteFromHeap(key string) {
// Key did not exist in heap, return early
if _, ok := h.Keys[key]; !ok {
return
}
idx, found := h.MaxHeap.FindIdx(key)
if !found {
return
}
heap.Remove(&h.MaxHeap, idx)
delete(h.Keys, key)
}
34 changes: 27 additions & 7 deletions pkg/clusteragent/autoscaling/workload/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func newController(
store *store,
podWatcher podWatcher,
localSender sender.Sender,
hashHeap *autoscaling.HashHeap,
) (*Controller, error) {
c := &Controller{
clusterID: clusterID,
Expand All @@ -91,7 +92,7 @@ func newController(
},
)

baseController, err := autoscaling.NewController(controllerID, c, dynamicClient, dynamicInformer, podAutoscalerGVR, isLeader, store, autoscalingWorkqueue)
baseController, err := autoscaling.NewController(controllerID, c, dynamicClient, dynamicInformer, podAutoscalerGVR, isLeader, store, autoscalingWorkqueue, hashHeap)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -174,7 +175,12 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
if podAutoscaler != nil {
// If we don't have an instance locally, we create it. Deletion is handled through setting the `Deleted` flag
log.Debugf("Creating internal PodAutoscaler: %s from Kubernetes object", key)
c.store.UnlockSet(key, model.NewPodAutoscalerInternal(podAutoscaler), c.ID)
podAutoscalerInternal := model.NewPodAutoscalerInternal(podAutoscaler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this change necessary?

c.store.UnlockSet(key, podAutoscalerInternal, c.ID)
c.AutoscalerHeap.InsertIntoHeap(autoscaling.TimestampKey{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having to track changes to Store, could be it registered as an observer?

Timestamp: podAutoscalerInternal.CreationTimestamp(),
Key: key,
})
} else {
// If podAutoscaler == nil, both objects are nil, nothing to do
log.Debugf("Reconciling object: %s but object is not present in Kubernetes nor in internal store, nothing to do", key)
Expand All @@ -189,8 +195,9 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
// If flagged for deletion, we just need to clear up our store (deletion complete)
// Also if object was not owned by remote config, we also need to delete it (deleted by user)
if podAutoscalerInternal.Deleted() || podAutoscalerInternal.Spec().Owner != datadoghq.DatadogPodAutoscalerRemoteOwner {
log.Infof("Object %s not present in Kuberntes and flagged for deletion (remote) or owner == local, clearing internal store", key)
log.Infof("Object %s not present in Kubernetes and flagged for deletion (remote) or owner == local, clearing internal store", key)
c.store.UnlockDelete(key, c.ID)
c.AutoscalerHeap.DeleteFromHeap(key)
return autoscaling.NoRequeue, nil
}

Expand All @@ -214,6 +221,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
if err != nil && errors.IsNotFound(err) {
log.Debugf("Object %s not found in Kubernetes during deletion, clearing internal store", key)
c.store.UnlockDelete(key, c.ID)
c.AutoscalerHeap.DeleteFromHeap(key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's code missing to handle updating internal PodAutoscaler with creation timestamp from Kubernetes once the object has been created in Kubernetes.

return autoscaling.NoRequeue, nil
}

Expand All @@ -229,7 +237,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
*podAutoscalerInternal.Spec().RemoteVersion > *podAutoscaler.Spec.RemoteVersion {
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler)

// When doing an external update, we stop and reqeue the object to not have multiple changes at once.
// When doing an external update, we stop and requeue the object to not have multiple changes at once.
c.store.Unlock(key)
return autoscaling.Requeue, err
}
Expand All @@ -253,7 +261,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
if localHash != remoteHash {
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler)

// When doing an external update, we stop and reqeue the object to not have multiple changes at once.
// When doing an external update, we stop and requeue the object to not have multiple changes at once.
c.store.Unlock(key)
return autoscaling.Requeue, err
}
Expand All @@ -272,8 +280,20 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
// Reaching this point, we had an error in processing, clearing up global error
podAutoscalerInternal.SetError(nil)

// Now that everything is synced, we can perform the actual processing
result, err := c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal)
isAdded := c.AutoscalerHeap.InsertIntoHeap(autoscaling.TimestampKey{
Timestamp: podAutoscalerInternal.CreationTimestamp(),
Key: key,
})

result := autoscaling.NoRequeue
var err error
if !isAdded {
// Number of DatadogPodAutoscaler objects exceeds the limit
podAutoscalerInternal.SetError(fmt.Errorf("Too many DatadogPodAutoscaler objects created, ignoring this one %s", key))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to be a bit more explicit. You also don't need to put the key in the error message as it's already attached to the object itself.
Something like Autoscaler disabled as maximum number per cluster reached (100).

} else {
// Now that everything is synced, we can perform the actual processing
result, err = c.handleScaling(ctx, podAutoscaler, &podAutoscalerInternal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to not have the main logic in an else though it would require a to change a bit the code structure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I can wait for the changes from #28723

func (c *Controller) updateAutoscalerStatusAndUnlock(ctx context.Context, key, ns, name string, err error, podAutoscalerInternal model.PodAutoscalerInternal, podAutoscaler *datadoghq.DatadogPodAutoscaler) error {
// Update status based on latest state
statusErr := c.updatePodAutoscalerStatus(ctx, podAutoscalerInternal, podAutoscaler)
if statusErr != nil {
log.Errorf("Failed to update status for PodAutoscaler: %s/%s, err: %v", ns, name, statusErr)
// We want to return the status error if none to count in the requeue retries.
if err == nil {
err = statusErr
}
}
c.store.UnlockSet(key, podAutoscalerInternal, c.ID)
return err
}

to be merged so I can make a status update and return early here

}

// Update status based on latest state
statusErr := c.updatePodAutoscalerStatus(ctx, podAutoscalerInternal, podAutoscaler)
Expand Down
81 changes: 68 additions & 13 deletions pkg/clusteragent/autoscaling/workload/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ import (
type fixture struct {
*autoscaling.ControllerFixture

clock *clock.FakeClock
recorder *record.FakeRecorder
store *store
clock *clock.FakeClock
recorder *record.FakeRecorder
store *store
autoscalingHeap *autoscaling.HashHeap
}

const testMaxAutoscalerObjects int = 2

func newFixture(t *testing.T, testTime time.Time) *fixture {
store := autoscaling.NewStore[model.PodAutoscalerInternal]()
clock := clock.NewFakeClock(testTime)
recorder := record.NewFakeRecorder(100)
hashHeap := autoscaling.NewHashHeap(testMaxAutoscalerObjects)
return &fixture{
ControllerFixture: autoscaling.NewFixture(
t, podAutoscalerGVR,
func(fakeClient *fake.FakeDynamicClient, informer dynamicinformer.DynamicSharedInformerFactory, isLeader func() bool) (*autoscaling.Controller, error) {
c, err := newController("cluster-id1", recorder, nil, nil, fakeClient, informer, isLeader, store, nil, nil)
c, err := newController("cluster-id1", recorder, nil, nil, fakeClient, informer, isLeader, store, nil, nil, hashHeap)
if err != nil {
return nil, err
}
Expand All @@ -56,20 +60,22 @@ func newFixture(t *testing.T, testTime time.Time) *fixture {
return c.Controller, err
},
),
clock: clock,
recorder: recorder,
store: store,
clock: clock,
recorder: recorder,
store: store,
autoscalingHeap: hashHeap,
}
}

func newFakePodAutoscaler(ns, name string, gen int64, spec datadoghq.DatadogPodAutoscalerSpec, status datadoghq.DatadogPodAutoscalerStatus) (obj *unstructured.Unstructured, dpa *datadoghq.DatadogPodAutoscaler) {
func newFakePodAutoscaler(ns, name string, gen int64, creationTimestamp time.Time, spec datadoghq.DatadogPodAutoscalerSpec, status datadoghq.DatadogPodAutoscalerStatus) (obj *unstructured.Unstructured, dpa *datadoghq.DatadogPodAutoscaler) {
dpa = &datadoghq.DatadogPodAutoscaler{
TypeMeta: podAutoscalerMeta,
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
Generation: gen,
UID: uuid.NewUUID(),
Name: name,
Namespace: ns,
Generation: gen,
UID: uuid.NewUUID(),
CreationTimestamp: metav1.NewTime(creationTimestamp),
},
Spec: spec,
Status: status,
Expand Down Expand Up @@ -97,8 +103,9 @@ func TestLeaderCreateDeleteLocal(t *testing.T) {
Owner: datadoghq.DatadogPodAutoscalerLocalOwner,
}

defaultCreationTime, _ := time.Parse("YYYY-MM-DD 00:00:00 +0000 UTC", "0001-01-01 00:00:00 +0000 UTC")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.Time{} is enough for that.

// Read newly created DPA
dpa, dpaTyped := newFakePodAutoscaler("default", "dpa-0", 1, dpaSpec, datadoghq.DatadogPodAutoscalerStatus{})
dpa, dpaTyped := newFakePodAutoscaler("default", "dpa-0", 1, defaultCreationTime, dpaSpec, datadoghq.DatadogPodAutoscalerStatus{})

f.InformerObjects = append(f.InformerObjects, dpa)
f.Objects = append(f.Objects, dpaTyped)
Expand Down Expand Up @@ -228,3 +235,51 @@ func TestLeaderCreateDeleteRemote(t *testing.T) {
f.RunControllerSync(true, "default/dpa-0")
assert.Len(t, f.store.GetAll(), 0)
}

func TestLeaderCreateDeleteLocalHeap(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's more something like TestPodAutoscalerObjectsLimit than testing the heap itself.

testTime := time.Now()
f := newFixture(t, testTime)

dpaSpec := datadoghq.DatadogPodAutoscalerSpec{
TargetRef: autoscalingv2.CrossVersionObjectReference{
Kind: "Deployment",
Name: "app-0",
APIVersion: "apps/v1",
},
// Local owner means .Spec source of truth is K8S
Owner: datadoghq.DatadogPodAutoscalerLocalOwner,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need a test on remote owner, which should have zero CreationTimestamp value first and then be created in Kubernetes and then get updated with Kubernetes creation timestamp

}

// Read newly created DPA
dpa, dpaTyped := newFakePodAutoscaler("default", "dpa-0", 1, time.Now().Add(-1*time.Hour), dpaSpec, datadoghq.DatadogPodAutoscalerStatus{})
dpa1, dpaTyped1 := newFakePodAutoscaler("default", "dpa-1", 1, time.Now(), dpaSpec, datadoghq.DatadogPodAutoscalerStatus{})
dpa2, dpaTyped2 := newFakePodAutoscaler("default", "dpa-2", 1, time.Now().Add(1*time.Hour), dpaSpec, datadoghq.DatadogPodAutoscalerStatus{})

f.InformerObjects = append(f.InformerObjects, dpa)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add multiple objects with a single append:

f.InformerObjects = append(f.InformerObjects, dpa, dpa1)
f.Objects = append(f.Objects, dpaTyped, dpaTyped1)

f.Objects = append(f.Objects, dpaTyped)

f.InformerObjects = append(f.InformerObjects, dpa1)
f.Objects = append(f.Objects, dpaTyped1)
f.RunControllerSync(true, "default/dpa-1")
// Check that DatadogPodAutoscaler object is inserted into heap
assert.Equal(t, 1, f.autoscalingHeap.MaxHeap.Len())
assert.Equal(t, "default/dpa-1", f.autoscalingHeap.MaxHeap.Peek().Key)
assert.Truef(t, f.autoscalingHeap.Keys["default/dpa-1"], "Expected dpa-1 to be in heap")

f.InformerObjects = append(f.InformerObjects, dpa2)
f.Objects = append(f.Objects, dpaTyped2)
// Check that multiple objects can be inserted with ordering preserved
f.RunControllerSync(true, "default/dpa-2")
assert.Equal(t, 2, f.autoscalingHeap.MaxHeap.Len())
assert.Equal(t, "default/dpa-2", f.autoscalingHeap.MaxHeap.Peek().Key)
assert.Truef(t, f.autoscalingHeap.Keys["default/dpa-1"], "Expected dpa-1 to be in heap")
assert.Truef(t, f.autoscalingHeap.Keys["default/dpa-2"], "Expected dpa-2 to be in heap")

f.RunControllerSync(true, "default/dpa-0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is named CreateDelete but I don't see a test on the delete part? For instance removing dpa-1 should make dpa-2 eligible.

// Check that heap ordering is preserved and limit is not exceeeded
assert.Equal(t, 2, f.autoscalingHeap.MaxHeap.Len())
assert.Equal(t, "default/dpa-1", f.autoscalingHeap.MaxHeap.Peek().Key)
assert.Truef(t, f.autoscalingHeap.Keys["default/dpa-0"], "Expected dpa-0 to be in heap")
assert.Truef(t, f.autoscalingHeap.Keys["default/dpa-1"], "Expected dpa-1 to be in heap")
assert.Falsef(t, f.autoscalingHeap.Keys["default/dpa-2"], "Expected dpa-2 to not be in heap")
}
Loading
Loading