-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[clusteragent/autoscaling] Impose 100 DatadogPodAutoscaler limit in cluster agent #28684
Changes from 31 commits
8ee3911
6cd039c
46ece6f
cabcad1
e70833e
843d36c
b200155
c6d6431
1863ce1
9ecf501
156ecff
90799f2
b680558
ea7979d
91e6b9f
0622fb4
7254318
03fd859
b29cfd0
565a3ef
b92ac07
3913a97
6669594
44589e5
4ae0ab3
3285d31
aedc9f8
0429380
653ab74
6cdaf91
b9a47f3
96ef6dd
4a317d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
// Unless explicitly stated otherwise all files in this repository are licensed | ||
// under the Apache License Version 2.0. | ||
// This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
// Copyright 2024-present Datadog, Inc. | ||
|
||
//go:build kubeapiserver | ||
|
||
package autoscaling | ||
|
||
import ( | ||
"container/heap" | ||
"sync" | ||
"time" | ||
|
||
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model" | ||
) | ||
|
||
type store = Store[model.PodAutoscalerInternal] | ||
|
||
// TimestampKey is a struct that holds a timestamp and key for a `DatadogPodAutoscaler` object | ||
type TimestampKey struct { | ||
Timestamp time.Time | ||
Key string | ||
} | ||
|
||
// MaxTimestampKeyHeap is a heap that sorts TimestampKey objects by timestamp in descending order | ||
type MaxTimestampKeyHeap []TimestampKey | ||
|
||
// Len returns the length of the heap | ||
func (h MaxTimestampKeyHeap) Len() int { return len(h) } | ||
|
||
// Less returns true if the timestamp at index i is after the timestamp at index j | ||
func (h MaxTimestampKeyHeap) Less(i, j int) bool { | ||
return LessThan(h[i], h[j]) | ||
} | ||
|
||
// LessThan returns true if the timestamp of k1 is after the timestamp of k2 | ||
func LessThan(k1, k2 TimestampKey) bool { | ||
return k1.Timestamp.After(k2.Timestamp) || (k1.Timestamp.Equal(k2.Timestamp) && k1.Key < k2.Key) | ||
} | ||
|
||
// Swap swaps the elements at indices i and j | ||
func (h MaxTimestampKeyHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } | ||
|
||
// Push adds an element to the heap while preserving max heap ordering | ||
func (h *MaxTimestampKeyHeap) Push(x interface{}) { | ||
*h = append(*h, x.(TimestampKey)) | ||
} | ||
|
||
// Pop removes the top element from the heap while preserving max heap ordering | ||
func (h *MaxTimestampKeyHeap) Pop() any { | ||
old := *h | ||
n := len(old) | ||
x := old[n-1] | ||
*h = old[0 : n-1] | ||
return x | ||
} | ||
|
||
// Peek returns the top element of the heap without removing it | ||
func (h *MaxTimestampKeyHeap) Peek() TimestampKey { | ||
return (*h)[0] | ||
} | ||
|
||
// NewMaxHeap returns a new MaxTimestampKeyHeap | ||
func NewMaxHeap() *MaxTimestampKeyHeap { | ||
h := &MaxTimestampKeyHeap{} | ||
heap.Init(h) | ||
return h | ||
} | ||
|
||
// FindIdx returns the index of the given key in the heap and a boolean indicating if the key was found | ||
func (h *MaxTimestampKeyHeap) FindIdx(key string) (int, bool) { | ||
for idx, k := range *h { | ||
if k.Key == key { | ||
return idx, true | ||
} | ||
} | ||
return 0, false | ||
} | ||
|
||
// HashHeap is a struct that holds a MaxHeap and a set of keys that exist in the heap | ||
type HashHeap struct { | ||
MaxHeap MaxTimestampKeyHeap | ||
Keys map[string]bool | ||
maxSize int | ||
mu sync.RWMutex | ||
store *store | ||
} | ||
|
||
// NewHashHeap returns a new MaxHeap with the given max size | ||
func NewHashHeap(maxSize int, store *store) *HashHeap { | ||
return &HashHeap{ | ||
MaxHeap: *NewMaxHeap(), | ||
Keys: make(map[string]bool), | ||
maxSize: maxSize, | ||
mu: sync.RWMutex{}, | ||
store: store, | ||
} | ||
} | ||
|
||
// InsertIntoHeap returns true if the key already exists in the max heap or was inserted correctly | ||
// Used as an ObserverFunc; accept sender as parameter to match ObserverFunc signature | ||
func (h *HashHeap) InsertIntoHeap(key, _sender string) { | ||
// Already in heap, do not try to insert | ||
if h.Exists(key) { | ||
return | ||
} | ||
|
||
// Get object from store | ||
podAutoscalerInternal, podAutoscalerInternalFound := h.store.Get(key) | ||
if !podAutoscalerInternalFound { | ||
return | ||
} | ||
|
||
if podAutoscalerInternal.CreationTimestamp().IsZero() { | ||
return | ||
} | ||
|
||
newTimestampKey := TimestampKey{ | ||
Timestamp: podAutoscalerInternal.CreationTimestamp(), | ||
Key: key, | ||
} | ||
|
||
h.mu.Lock() | ||
defer h.mu.Unlock() | ||
|
||
if h.MaxHeap.Len() >= h.maxSize { | ||
top := h.MaxHeap.Peek() | ||
// If the new key is newer than or equal to the top key, do not insert | ||
if LessThan(newTimestampKey, top) { | ||
return | ||
} | ||
delete(h.Keys, top.Key) | ||
heap.Pop(&h.MaxHeap) | ||
} | ||
|
||
heap.Push(&h.MaxHeap, newTimestampKey) | ||
h.Keys[key] = true | ||
} | ||
|
||
// DeleteFromHeap removes the given key from the max heap | ||
// Used as an ObserverFunc; accept sender as parameter to match ObserverFunc signature | ||
func (h *HashHeap) DeleteFromHeap(key, _sender string) { | ||
// Key did not exist in heap, return early | ||
if !h.Exists(key) { | ||
return | ||
} | ||
h.mu.RLock() | ||
idx, found := h.MaxHeap.FindIdx(key) | ||
h.mu.RUnlock() | ||
|
||
if !found { | ||
return | ||
} | ||
|
||
h.mu.Lock() | ||
defer h.mu.Unlock() | ||
heap.Remove(&h.MaxHeap, idx) | ||
delete(h.Keys, key) | ||
} | ||
|
||
// Exists returns true if the given key exists in the heap | ||
func (h *HashHeap) Exists(key string) bool { | ||
h.mu.RLock() | ||
defer h.mu.RUnlock() | ||
_, ok := h.Keys[key] | ||
return ok | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,8 @@ type Controller struct { | |
eventRecorder record.EventRecorder | ||
store *store | ||
|
||
limitHeap *autoscaling.HashHeap | ||
|
||
podWatcher podWatcher | ||
horizontalController *horizontalController | ||
verticalController *verticalController | ||
|
@@ -79,6 +81,7 @@ func newController( | |
store *store, | ||
podWatcher podWatcher, | ||
localSender sender.Sender, | ||
limitHeap *autoscaling.HashHeap, | ||
) (*Controller, error) { | ||
c := &Controller{ | ||
clusterID: clusterID, | ||
|
@@ -101,6 +104,11 @@ func newController( | |
} | ||
|
||
c.Controller = baseController | ||
c.limitHeap = limitHeap | ||
store.RegisterObserver(autoscaling.Observer{ | ||
SetFunc: c.limitHeap.InsertIntoHeap, | ||
DeleteFunc: c.limitHeap.DeleteFromHeap, | ||
}) | ||
c.store = store | ||
c.podWatcher = podWatcher | ||
|
||
|
@@ -193,7 +201,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string | |
// If flagged for deletion, we just need to clear up our store (deletion complete) | ||
// Also if object was not owned by remote config, we also need to delete it (deleted by user) | ||
if podAutoscalerInternal.Deleted() || podAutoscalerInternal.Spec().Owner != datadoghq.DatadogPodAutoscalerRemoteOwner { | ||
log.Infof("Object %s not present in Kuberntes and flagged for deletion (remote) or owner == local, clearing internal store", key) | ||
log.Infof("Object %s not present in Kubernetes and flagged for deletion (remote) or owner == local, clearing internal store", key) | ||
c.store.UnlockDelete(key, c.ID) | ||
return autoscaling.NoRequeue, nil | ||
} | ||
|
@@ -233,7 +241,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string | |
*podAutoscalerInternal.Spec().RemoteVersion > *podAutoscaler.Spec.RemoteVersion { | ||
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler) | ||
|
||
// When doing an external update, we stop and reqeue the object to not have multiple changes at once. | ||
// When doing an external update, we stop and requeue the object to not have multiple changes at once. | ||
c.store.Unlock(key) | ||
return autoscaling.Requeue, err | ||
} | ||
|
@@ -242,6 +250,11 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string | |
// and compare it with the one in the PodAutoscaler. If they differ, we should update the PodAutoscaler | ||
// otherwise store the Generation | ||
if podAutoscalerInternal.Generation() != podAutoscaler.Generation { | ||
if podAutoscalerInternal.CreationTimestamp().IsZero() { | ||
podAutoscalerInternal.UpdateCreationTimestamp(podAutoscaler.CreationTimestamp.Time) | ||
return autoscaling.Requeue, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, nil, podAutoscalerInternal, podAutoscaler) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason why we requeue here? I'd not expect it to be necessary (similar to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. originally set it to requeue to trigger a store update earlier, and minimize the amount of time that a status may show an error state incorrectly (i.e. when the limit is not exceeded but the key hasn't yet been added to the heap because it had a zero creation timestamp). moved this and removed the requeue for now as well! |
||
} | ||
|
||
localHash, err := autoscaling.ObjectHash(podAutoscalerInternal.Spec()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good find! We need to check all usage of |
||
if err != nil { | ||
c.store.Unlock(key) | ||
|
@@ -257,7 +270,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string | |
if localHash != remoteHash { | ||
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler) | ||
|
||
// When doing an external update, we stop and reqeue the object to not have multiple changes at once. | ||
// When doing an external update, we stop and requeue the object to not have multiple changes at once. | ||
c.store.Unlock(key) | ||
return autoscaling.Requeue, err | ||
} | ||
|
@@ -277,7 +290,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string | |
podAutoscalerInternal.SetError(nil) | ||
|
||
// Validate autoscaler requirements | ||
validationErr := c.validateAutoscaler(podAutoscaler) | ||
validationErr := c.validateAutoscaler(podAutoscalerInternal) | ||
if validationErr != nil { | ||
podAutoscalerInternal.SetError(validationErr) | ||
return autoscaling.NoRequeue, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, validationErr, podAutoscalerInternal, podAutoscaler) | ||
|
@@ -407,15 +420,22 @@ func (c *Controller) deletePodAutoscaler(ns, name string) error { | |
return nil | ||
} | ||
|
||
func (c *Controller) validateAutoscaler(podAutoscaler *datadoghq.DatadogPodAutoscaler) error { | ||
func (c *Controller) validateAutoscaler(podAutoscalerInternal model.PodAutoscalerInternal) error { | ||
// Check that we are within the limit of 100 DatadogPodAutoscalers | ||
key := podAutoscalerInternal.ID() | ||
if !podAutoscalerInternal.CreationTimestamp().IsZero() && !c.limitHeap.Exists(key) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to reach this point while have a zero There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point! i can update it to check if the value in the store is 0 if necessary but removed for now |
||
return fmt.Errorf("Autoscaler disabled as maximum number per cluster reached (%d)", maxDatadogPodAutoscalerObjects) | ||
} | ||
|
||
// Check that targetRef is not set to the cluster agent | ||
clusterAgentPodName, err := common.GetSelfPodName() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking about this (not exactly related to this PR), I don't think we want to |
||
// If we cannot get cluster agent pod name, just skip the validation logic | ||
if err != nil { | ||
return fmt.Errorf("Unable to get the cluster agent pod name: %w", err) | ||
return nil | ||
} | ||
|
||
var resourceName string | ||
switch owner := podAutoscaler.Spec.TargetRef.Kind; owner { | ||
switch owner := podAutoscalerInternal.Spec().TargetRef.Kind; owner { | ||
case "Deployment": | ||
resourceName = kubernetes.ParseDeploymentForPodName(clusterAgentPodName) | ||
case "ReplicaSet": | ||
|
@@ -424,7 +444,7 @@ func (c *Controller) validateAutoscaler(podAutoscaler *datadoghq.DatadogPodAutos | |
|
||
clusterAgentNs := common.GetMyNamespace() | ||
|
||
if podAutoscaler.Namespace == clusterAgentNs && podAutoscaler.Spec.TargetRef.Name == resourceName { | ||
if podAutoscalerInternal.Namespace() == clusterAgentNs && podAutoscalerInternal.Spec().TargetRef.Name == resourceName { | ||
return fmt.Errorf("Autoscaling target cannot be set to the cluster agent") | ||
} | ||
return nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashHeap
is currently not thread safe, which could become necessary if we have multiple workers or if we move the store to object locking.