Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clusteragent/autoscaling] Impose 100 DatadogPodAutoscaler limit in cluster agent #28684

Merged
merged 33 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8ee3911
Add creation timestamp to DatadogPodAutoscalerInternal
jennchenn Aug 20, 2024
6cd039c
Fix minor typos
jennchenn Aug 21, 2024
46ece6f
Implement heap to track number of DPA objects
jennchenn Aug 22, 2024
cabcad1
Update heap names to reduce redundancy in calls
jennchenn Aug 23, 2024
e70833e
Add basic unit test for creation of heap
jennchenn Aug 23, 2024
843d36c
fixup! Update heap names to reduce redundancy in calls
jennchenn Aug 23, 2024
b200155
Make error message shown in status more explicit
jennchenn Aug 30, 2024
c6d6431
Reorder PAI params to have metadata fields first
jennchenn Aug 30, 2024
1863ce1
Prevent adding objects with 0 timestamp to heap
jennchenn Aug 30, 2024
9ecf501
Remove hashheap from generic controller definition and add delete obs…
jennchenn Aug 30, 2024
156ecff
Register observer on store to update heap
jennchenn Sep 6, 2024
90799f2
Merge remote-tracking branch 'origin/main' into jenn/CASCL-1_autoscal…
jennchenn Sep 6, 2024
b680558
Add check for 100 DPAs to validation logic
jennchenn Sep 6, 2024
ea7979d
Refactor max heap functions to use Exists helper
jennchenn Sep 6, 2024
91e6b9f
Update DCA target test with autoscaler creation timestamp
jennchenn Sep 6, 2024
0622fb4
Remove period from error message; remove redundant return
jennchenn Sep 6, 2024
7254318
Update local test to check behavior on delete
jennchenn Sep 25, 2024
03fd859
Add RWMutex to hash heap
jennchenn Sep 25, 2024
b29cfd0
Remove test namespace hardcoded value
jennchenn Sep 26, 2024
565a3ef
Update creation timestamp for remote owner
jennchenn Sep 26, 2024
b92ac07
Rename controller hashHeap to limitHeap for better description of usage
jennchenn Sep 27, 2024
3913a97
Remove redundant variable initialization
jennchenn Sep 27, 2024
6669594
Rename test to be more descriptive
jennchenn Sep 27, 2024
44589e5
Update creation timestamp only when spec changes
jennchenn Sep 27, 2024
4ae0ab3
Simplify validation logic and pass only one param
jennchenn Sep 27, 2024
3285d31
Skip rest of DCA validation logic if self pod name cannot be found
jennchenn Sep 27, 2024
aedc9f8
fixup! Rename controller hashHeap to limitHeap for better description…
jennchenn Sep 27, 2024
0429380
Use store directly in hash heap to avoid passing obj to observer
jennchenn Sep 27, 2024
653ab74
Test hash heap on remote owner DPA objects
jennchenn Oct 1, 2024
6cdaf91
Merge remote-tracking branch 'origin/main' into jenn/CASCL-1_autoscal…
jennchenn Oct 1, 2024
b9a47f3
Pass in store to hash heap
jennchenn Oct 2, 2024
96ef6dd
Remove duplication and clean up fake pod autoscaler helper function
jennchenn Oct 3, 2024
4a317d5
Remove requeue after creation timestamp update
jennchenn Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/clusteragent/autoscaling/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *Controller) enqueue(obj interface{}) {
c.Workqueue.AddRateLimited(key)
}

func (c *Controller) enqueueID(id, sender string) {
func (c *Controller) enqueueID(id, sender string, _ any) {
// Do not enqueue our own updates (avoid infinite loops)
if sender != c.ID {
log.Tracef("Enqueueing from observer update id: %s from sender: %s", id, sender)
Expand Down
164 changes: 164 additions & 0 deletions pkg/clusteragent/autoscaling/max_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build kubeapiserver

package autoscaling

import (
"container/heap"
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model"
)

// TimestampKey is a struct that holds a timestamp and key for a `DatadogPodAutoscaler` object
type TimestampKey struct {
Timestamp time.Time
Key string
}

// MaxTimestampKeyHeap is a heap that sorts TimestampKey objects by timestamp in descending order
type MaxTimestampKeyHeap []TimestampKey

// Len returns the length of the heap
func (h MaxTimestampKeyHeap) Len() int { return len(h) }

// Less returns true if the timestamp at index i is after the timestamp at index j
func (h MaxTimestampKeyHeap) Less(i, j int) bool {
return LessThan(h[i], h[j])
}

// LessThan returns true if the timestamp of k1 is after the timestamp of k2
func LessThan(k1, k2 TimestampKey) bool {
return k1.Timestamp.After(k2.Timestamp) || (k1.Timestamp.Equal(k2.Timestamp) && k1.Key < k2.Key)
}

// Swap swaps the elements at indices i and j
func (h MaxTimestampKeyHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

// Push adds an element to the heap while preserving max heap ordering
func (h *MaxTimestampKeyHeap) Push(x interface{}) {
*h = append(*h, x.(TimestampKey))
}

// Pop removes the top element from the heap while preserving max heap ordering
func (h *MaxTimestampKeyHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// Peek returns the top element of the heap without removing it
func (h *MaxTimestampKeyHeap) Peek() TimestampKey {
return (*h)[0]
}

// NewMaxHeap returns a new MaxTimestampKeyHeap
func NewMaxHeap() *MaxTimestampKeyHeap {
h := &MaxTimestampKeyHeap{}
heap.Init(h)
return h
}

// FindIdx returns the index of the given key in the heap and a boolean indicating if the key was found
func (h *MaxTimestampKeyHeap) FindIdx(key string) (int, bool) {
for idx, k := range *h {
if k.Key == key {
return idx, true
}
}
return 0, false
}

// HashHeap is a struct that holds a MaxHeap and a set of keys that exist in the heap
type HashHeap struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashHeap is currently not thread safe, which could become necessary if we have multiple workers or if we move the store to object locking.

MaxHeap MaxTimestampKeyHeap
Keys map[string]bool
maxSize int
mu sync.RWMutex
}

// NewHashHeap returns a new MaxHeap with the given max size
func NewHashHeap(maxSize int) *HashHeap {
return &HashHeap{
MaxHeap: *NewMaxHeap(),
Keys: make(map[string]bool),
maxSize: maxSize,
mu: sync.RWMutex{},
}
}

// InsertIntoHeap returns true if the key already exists in the max heap or was inserted correctly
// Used as an ObserverFunc; accept sender as parameter to match ObserverFunc signature
func (h *HashHeap) InsertIntoHeap(key, _sender string, obj any) {
// Already in heap, do not try to insert
if h.Exists(key) {
return
}

// Cast object into PodAutoscalerInternal
pai, ok := obj.(model.PodAutoscalerInternal)
if !ok {
return
}

if pai.CreationTimestamp().IsZero() {
return
}

newTimestampKey := TimestampKey{
Timestamp: pai.CreationTimestamp(),
Key: key,
}

h.mu.Lock()
defer h.mu.Unlock()

if h.MaxHeap.Len() >= h.maxSize {
top := h.MaxHeap.Peek()
// If the new key is newer than or equal to the top key, do not insert
if LessThan(newTimestampKey, top) {
return
}
delete(h.Keys, top.Key)
heap.Pop(&h.MaxHeap)
}

heap.Push(&h.MaxHeap, newTimestampKey)
h.Keys[key] = true
}

// DeleteFromHeap removes the given key from the max heap
// Used as an ObserverFunc; accept sender and obj as parameter to match ObserverFunc signature
func (h *HashHeap) DeleteFromHeap(key, _sender string, _obj any) {
// Key did not exist in heap, return early
if !h.Exists(key) {
return
}
h.mu.RLock()
idx, found := h.MaxHeap.FindIdx(key)
h.mu.RUnlock()

if !found {
return
}

h.mu.Lock()
defer h.mu.Unlock()
heap.Remove(&h.MaxHeap, idx)
delete(h.Keys, key)
}

// Exists returns true if the given key exists in the heap
func (h *HashHeap) Exists(key string) bool {
h.mu.RLock()
defer h.mu.RUnlock()
_, ok := h.Keys[key]
return ok
}
23 changes: 12 additions & 11 deletions pkg/clusteragent/autoscaling/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
)

// ObserverFunc represents observer functions of the store
type ObserverFunc func(string, string)
type ObserverFunc func(string, string, any)

// Observer allows to define functions to watch changes in Store
type Observer struct {
Expand Down Expand Up @@ -102,20 +102,20 @@ func (s *Store[T]) GetFiltered(filter func(T) bool) []T {
// Updator func is expected to return the new object and a boolean indicating if the object has changed.
// The object is updated only if boolean is true, observers are notified only for updated objects after all objects have been updated.
func (s *Store[T]) Update(updator func(T) (T, bool), sender string) {
var changedIDs []string
changedObjects := make(map[string]T)
s.lock.Lock()
for id, object := range s.store {
newObject, changed := updator(object)
if changed {
s.store[id] = newObject
changedIDs = append(changedIDs, id)
changedObjects[id] = newObject
}
}
s.lock.Unlock()

// Notifying must be done after releasing the lock
for _, id := range changedIDs {
s.notify(setOperation, id, sender)
for id, newObject := range changedObjects {
s.notify(setOperation, id, sender, newObject)
}
}

Expand All @@ -133,7 +133,7 @@ func (s *Store[T]) Set(id string, obj T, sender string) {
s.store[id] = obj
s.lock.Unlock()

s.notify(setOperation, id, sender)
s.notify(setOperation, id, sender, obj)
}

// Delete object corresponding to id if present
Expand All @@ -144,7 +144,7 @@ func (s *Store[T]) Delete(id, sender string) {
s.lock.Unlock()

if exists {
s.notify(deleteOperation, id, sender)
s.notify(deleteOperation, id, sender, nil)
}
}

Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *Store[T]) UnlockSet(id string, obj T, sender string) {
s.store[id] = obj
s.lock.Unlock()

s.notify(setOperation, id, sender)
s.notify(setOperation, id, sender, obj)
}

// UnlockDelete deletes an object and releases the lock (previously acquired by `LockRead`)
Expand All @@ -185,16 +185,17 @@ func (s *Store[T]) UnlockDelete(id, sender string) {
s.lock.Unlock()

if exists {
s.notify(deleteOperation, id, sender)
s.notify(deleteOperation, id, sender, nil)
}
}

// It's a very simple implementation of a notify process, but it's enough in our case as we aim at only 1 or 2 observers
func (s *Store[T]) notify(operationType storeOperation, key, sender string) {
// TODO: if we want to subscribe on set, should we pass the object as well?
func (s *Store[T]) notify(operationType storeOperation, key, sender string, obj any) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible to pass obj in because it means you could modify an object silently, without holding the lock.

s.observersLock.RLock()
defer s.observersLock.RUnlock()

for _, observer := range s.observers[operationType] {
observer(key, sender)
observer(key, sender, obj)
}
}
30 changes: 23 additions & 7 deletions pkg/clusteragent/autoscaling/workload/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Controller struct {
verticalController *verticalController

localSender sender.Sender

hashHeap *autoscaling.HashHeap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The variable name should match usage, in that probably something like limitHeap or countHeap.
Also the variable is more linked to the store and perhaps should be in the block just below store

}

// newController returns a new workload autoscaling controller
Expand All @@ -78,6 +80,7 @@ func newController(
store *store,
podWatcher podWatcher,
localSender sender.Sender,
hashHeap *autoscaling.HashHeap,
) (*Controller, error) {
c := &Controller{
clusterID: clusterID,
Expand All @@ -100,6 +103,11 @@ func newController(
}

c.Controller = baseController
c.hashHeap = hashHeap
store.RegisterObserver(autoscaling.Observer{
SetFunc: c.hashHeap.InsertIntoHeap,
DeleteFunc: c.hashHeap.DeleteFromHeap,
})
c.store = store
c.podWatcher = podWatcher

Expand Down Expand Up @@ -177,7 +185,8 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
if podAutoscaler != nil {
// If we don't have an instance locally, we create it. Deletion is handled through setting the `Deleted` flag
log.Debugf("Creating internal PodAutoscaler: %s from Kubernetes object", key)
c.store.UnlockSet(key, model.NewPodAutoscalerInternal(podAutoscaler), c.ID)
podAutoscalerInternal := model.NewPodAutoscalerInternal(podAutoscaler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is this change necessary?

c.store.UnlockSet(key, podAutoscalerInternal, c.ID)
} else {
// If podAutoscaler == nil, both objects are nil, nothing to do
log.Debugf("Reconciling object: %s but object is not present in Kubernetes nor in internal store, nothing to do", key)
Expand All @@ -192,7 +201,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
// If flagged for deletion, we just need to clear up our store (deletion complete)
// Also if object was not owned by remote config, we also need to delete it (deleted by user)
if podAutoscalerInternal.Deleted() || podAutoscalerInternal.Spec().Owner != datadoghq.DatadogPodAutoscalerRemoteOwner {
log.Infof("Object %s not present in Kuberntes and flagged for deletion (remote) or owner == local, clearing internal store", key)
log.Infof("Object %s not present in Kubernetes and flagged for deletion (remote) or owner == local, clearing internal store", key)
c.store.UnlockDelete(key, c.ID)
return autoscaling.NoRequeue, nil
}
Expand Down Expand Up @@ -226,17 +235,19 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
}

// If the object is owned by remote config and newer, we need to update the spec in Kubernetes
// If Kubernetes is newer, we wait for RC to update the object in our internal store.
// If Kubernetes is newer, we update creation timestamp and wait for RC to update the rest of object in our internal store.
if podAutoscalerInternal.Spec().RemoteVersion != nil &&
podAutoscaler.Spec.RemoteVersion != nil &&
*podAutoscalerInternal.Spec().RemoteVersion > *podAutoscaler.Spec.RemoteVersion {
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler)

// When doing an external update, we stop and reqeue the object to not have multiple changes at once.
// When doing an external update, we stop and requeue the object to not have multiple changes at once.
c.store.Unlock(key)
return autoscaling.Requeue, err
}

podAutoscalerInternal.UpdateCreationTimestamp(podAutoscaler.CreationTimestamp.Time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that you put that inside if podAutoscalerInternal.Generation() != podAutoscaler.Generation {, as we're interested in getting the creation timestamp only once (it won't be only once, but when there's a .Spec change, which should not happen frequently)


// If Generation != podAutoscaler.Generation, we should compute `.Spec` hash
// and compare it with the one in the PodAutoscaler. If they differ, we should update the PodAutoscaler
// otherwise store the Generation
Expand All @@ -256,7 +267,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
if localHash != remoteHash {
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler)

// When doing an external update, we stop and reqeue the object to not have multiple changes at once.
// When doing an external update, we stop and requeue the object to not have multiple changes at once.
c.store.Unlock(key)
return autoscaling.Requeue, err
}
Expand All @@ -276,7 +287,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
podAutoscalerInternal.SetError(nil)

// Validate autoscaler requirements
validationErr := c.validateAutoscaler(podAutoscaler)
validationErr := c.validateAutoscaler(key, podAutoscaler)
if validationErr != nil {
podAutoscalerInternal.SetError(validationErr)
return autoscaling.NoRequeue, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, validationErr, podAutoscalerInternal, podAutoscaler)
Expand Down Expand Up @@ -387,7 +398,12 @@ func (c *Controller) deletePodAutoscaler(ns, name string) error {
return nil
}

func (c *Controller) validateAutoscaler(podAutoscaler *datadoghq.DatadogPodAutoscaler) error {
func (c *Controller) validateAutoscaler(key string, podAutoscaler *datadoghq.DatadogPodAutoscaler) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

podAutoscalerInternal should have everything you need in a single object.

// Check that we are within the limit of 100 DatadogPodAutoscalers
if !c.hashHeap.Exists(key) {
return fmt.Errorf("Autoscaler disabled as maximum number per cluster reached (%d)", maxDatadogPodAutoscalerObjects)
}

// Check that targetRef is not set to the cluster agent
clusterAgentPodName, err := common.GetSelfPodName()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this (not exactly related to this PR), I don't think we want to Error all DPA if we cannot get self pod name, we should probably just skip.

if err != nil {
Expand Down
Loading
Loading