Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clusteragent/autoscaling] Impose 100 DatadogPodAutoscaler limit in cluster agent #28684

Merged
merged 33 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8ee3911
Add creation timestamp to DatadogPodAutoscalerInternal
jennchenn Aug 20, 2024
6cd039c
Fix minor typos
jennchenn Aug 21, 2024
46ece6f
Implement heap to track number of DPA objects
jennchenn Aug 22, 2024
cabcad1
Update heap names to reduce redundancy in calls
jennchenn Aug 23, 2024
e70833e
Add basic unit test for creation of heap
jennchenn Aug 23, 2024
843d36c
fixup! Update heap names to reduce redundancy in calls
jennchenn Aug 23, 2024
b200155
Make error message shown in status more explicit
jennchenn Aug 30, 2024
c6d6431
Reorder PAI params to have metadata fields first
jennchenn Aug 30, 2024
1863ce1
Prevent adding objects with 0 timestamp to heap
jennchenn Aug 30, 2024
9ecf501
Remove hashheap from generic controller definition and add delete obs…
jennchenn Aug 30, 2024
156ecff
Register observer on store to update heap
jennchenn Sep 6, 2024
90799f2
Merge remote-tracking branch 'origin/main' into jenn/CASCL-1_autoscal…
jennchenn Sep 6, 2024
b680558
Add check for 100 DPAs to validation logic
jennchenn Sep 6, 2024
ea7979d
Refactor max heap functions to use Exists helper
jennchenn Sep 6, 2024
91e6b9f
Update DCA target test with autoscaler creation timestamp
jennchenn Sep 6, 2024
0622fb4
Remove period from error message; remove redundant return
jennchenn Sep 6, 2024
7254318
Update local test to check behavior on delete
jennchenn Sep 25, 2024
03fd859
Add RWMutex to hash heap
jennchenn Sep 25, 2024
b29cfd0
Remove test namespace hardcoded value
jennchenn Sep 26, 2024
565a3ef
Update creation timestamp for remote owner
jennchenn Sep 26, 2024
b92ac07
Rename controller hashHeap to limitHeap for better description of usage
jennchenn Sep 27, 2024
3913a97
Remove redundant variable initialization
jennchenn Sep 27, 2024
6669594
Rename test to be more descriptive
jennchenn Sep 27, 2024
44589e5
Update creation timestamp only when spec changes
jennchenn Sep 27, 2024
4ae0ab3
Simplify validation logic and pass only one param
jennchenn Sep 27, 2024
3285d31
Skip rest of DCA validation logic if self pod name cannot be found
jennchenn Sep 27, 2024
aedc9f8
fixup! Rename controller hashHeap to limitHeap for better description…
jennchenn Sep 27, 2024
0429380
Use store directly in hash heap to avoid passing obj to observer
jennchenn Sep 27, 2024
653ab74
Test hash heap on remote owner DPA objects
jennchenn Oct 1, 2024
6cdaf91
Merge remote-tracking branch 'origin/main' into jenn/CASCL-1_autoscal…
jennchenn Oct 1, 2024
b9a47f3
Pass in store to hash heap
jennchenn Oct 2, 2024
96ef6dd
Remove duplication and clean up fake pod autoscaler helper function
jennchenn Oct 3, 2024
4a317d5
Remove requeue after creation timestamp update
jennchenn Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions pkg/clusteragent/autoscaling/max_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build kubeapiserver

package autoscaling

import (
"container/heap"
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model"
)

type store = Store[model.PodAutoscalerInternal]

// TimestampKey is a struct that holds a timestamp and key for a `DatadogPodAutoscaler` object
type TimestampKey struct {
Timestamp time.Time
Key string
}

// MaxTimestampKeyHeap is a heap that sorts TimestampKey objects by timestamp in descending order
type MaxTimestampKeyHeap []TimestampKey

// Len returns the length of the heap
func (h MaxTimestampKeyHeap) Len() int { return len(h) }

// Less returns true if the timestamp at index i is after the timestamp at index j
func (h MaxTimestampKeyHeap) Less(i, j int) bool {
return LessThan(h[i], h[j])
}

// LessThan returns true if the timestamp of k1 is after the timestamp of k2
func LessThan(k1, k2 TimestampKey) bool {
return k1.Timestamp.After(k2.Timestamp) || (k1.Timestamp.Equal(k2.Timestamp) && k1.Key < k2.Key)
}

// Swap swaps the elements at indices i and j
func (h MaxTimestampKeyHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

// Push adds an element to the heap while preserving max heap ordering
func (h *MaxTimestampKeyHeap) Push(x interface{}) {
*h = append(*h, x.(TimestampKey))
}

// Pop removes the top element from the heap while preserving max heap ordering
func (h *MaxTimestampKeyHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// Peek returns the top element of the heap without removing it
func (h *MaxTimestampKeyHeap) Peek() TimestampKey {
return (*h)[0]
}

// NewMaxHeap returns a new MaxTimestampKeyHeap
func NewMaxHeap() *MaxTimestampKeyHeap {
h := &MaxTimestampKeyHeap{}
heap.Init(h)
return h
}

// FindIdx returns the index of the given key in the heap and a boolean indicating if the key was found
func (h *MaxTimestampKeyHeap) FindIdx(key string) (int, bool) {
for idx, k := range *h {
if k.Key == key {
return idx, true
}
}
return 0, false
}

// HashHeap is a struct that holds a MaxHeap and a set of keys that exist in the heap
type HashHeap struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashHeap is currently not thread safe, which could become necessary if we have multiple workers or if we move the store to object locking.

MaxHeap MaxTimestampKeyHeap
Keys map[string]bool
maxSize int
mu sync.RWMutex
store *store
}

// NewHashHeap returns a new MaxHeap with the given max size
func NewHashHeap(maxSize int, store *store) *HashHeap {
return &HashHeap{
MaxHeap: *NewMaxHeap(),
Keys: make(map[string]bool),
maxSize: maxSize,
mu: sync.RWMutex{},
store: store,
}
}

// InsertIntoHeap returns true if the key already exists in the max heap or was inserted correctly
// Used as an ObserverFunc; accept sender as parameter to match ObserverFunc signature
func (h *HashHeap) InsertIntoHeap(key, _sender string) {
// Already in heap, do not try to insert
if h.Exists(key) {
return
}

// Get object from store
podAutoscalerInternal, podAutoscalerInternalFound := h.store.Get(key)
if !podAutoscalerInternalFound {
return
}

if podAutoscalerInternal.CreationTimestamp().IsZero() {
return
}

newTimestampKey := TimestampKey{
Timestamp: podAutoscalerInternal.CreationTimestamp(),
Key: key,
}

h.mu.Lock()
defer h.mu.Unlock()

if h.MaxHeap.Len() >= h.maxSize {
top := h.MaxHeap.Peek()
// If the new key is newer than or equal to the top key, do not insert
if LessThan(newTimestampKey, top) {
return
}
delete(h.Keys, top.Key)
heap.Pop(&h.MaxHeap)
}

heap.Push(&h.MaxHeap, newTimestampKey)
h.Keys[key] = true
}

// DeleteFromHeap removes the given key from the max heap
// Used as an ObserverFunc; accept sender as parameter to match ObserverFunc signature
func (h *HashHeap) DeleteFromHeap(key, _sender string) {
// Key did not exist in heap, return early
if !h.Exists(key) {
return
}
h.mu.RLock()
idx, found := h.MaxHeap.FindIdx(key)
h.mu.RUnlock()

if !found {
return
}

h.mu.Lock()
defer h.mu.Unlock()
heap.Remove(&h.MaxHeap, idx)
delete(h.Keys, key)
}

// Exists returns true if the given key exists in the heap
func (h *HashHeap) Exists(key string) bool {
h.mu.RLock()
defer h.mu.RUnlock()
_, ok := h.Keys[key]
return ok
}
34 changes: 26 additions & 8 deletions pkg/clusteragent/autoscaling/workload/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Controller struct {
eventRecorder record.EventRecorder
store *store

limitHeap *autoscaling.HashHeap

podWatcher podWatcher
horizontalController *horizontalController
verticalController *verticalController
Expand All @@ -79,6 +81,7 @@ func newController(
store *store,
podWatcher podWatcher,
localSender sender.Sender,
limitHeap *autoscaling.HashHeap,
) (*Controller, error) {
c := &Controller{
clusterID: clusterID,
Expand All @@ -101,6 +104,11 @@ func newController(
}

c.Controller = baseController
c.limitHeap = limitHeap
store.RegisterObserver(autoscaling.Observer{
SetFunc: c.limitHeap.InsertIntoHeap,
DeleteFunc: c.limitHeap.DeleteFromHeap,
})
c.store = store
c.podWatcher = podWatcher

Expand Down Expand Up @@ -193,7 +201,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
// If flagged for deletion, we just need to clear up our store (deletion complete)
// Also if object was not owned by remote config, we also need to delete it (deleted by user)
if podAutoscalerInternal.Deleted() || podAutoscalerInternal.Spec().Owner != datadoghq.DatadogPodAutoscalerRemoteOwner {
log.Infof("Object %s not present in Kuberntes and flagged for deletion (remote) or owner == local, clearing internal store", key)
log.Infof("Object %s not present in Kubernetes and flagged for deletion (remote) or owner == local, clearing internal store", key)
c.store.UnlockDelete(key, c.ID)
return autoscaling.NoRequeue, nil
}
Expand Down Expand Up @@ -233,7 +241,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
*podAutoscalerInternal.Spec().RemoteVersion > *podAutoscaler.Spec.RemoteVersion {
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler)

// When doing an external update, we stop and reqeue the object to not have multiple changes at once.
// When doing an external update, we stop and requeue the object to not have multiple changes at once.
c.store.Unlock(key)
return autoscaling.Requeue, err
}
Expand All @@ -257,12 +265,15 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
if localHash != remoteHash {
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler)

// When doing an external update, we stop and reqeue the object to not have multiple changes at once.
// When doing an external update, we stop and requeue the object to not have multiple changes at once.
c.store.Unlock(key)
return autoscaling.Requeue, err
}

podAutoscalerInternal.SetGeneration(podAutoscaler.Generation)
if podAutoscalerInternal.CreationTimestamp().IsZero() {
podAutoscalerInternal.UpdateCreationTimestamp(podAutoscaler.CreationTimestamp.Time)
}
}
}

Expand All @@ -277,7 +288,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
podAutoscalerInternal.SetError(nil)

// Validate autoscaler requirements
validationErr := c.validateAutoscaler(podAutoscaler)
validationErr := c.validateAutoscaler(podAutoscalerInternal)
if validationErr != nil {
podAutoscalerInternal.SetError(validationErr)
return autoscaling.NoRequeue, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, validationErr, podAutoscalerInternal, podAutoscaler)
Expand Down Expand Up @@ -407,15 +418,22 @@ func (c *Controller) deletePodAutoscaler(ns, name string) error {
return nil
}

func (c *Controller) validateAutoscaler(podAutoscaler *datadoghq.DatadogPodAutoscaler) error {
func (c *Controller) validateAutoscaler(podAutoscalerInternal model.PodAutoscalerInternal) error {
// Check that we are within the limit of 100 DatadogPodAutoscalers
key := podAutoscalerInternal.ID()
if !c.limitHeap.Exists(key) {
return fmt.Errorf("Autoscaler disabled as maximum number per cluster reached (%d)", maxDatadogPodAutoscalerObjects)
}

// Check that targetRef is not set to the cluster agent
clusterAgentPodName, err := common.GetSelfPodName()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this (not exactly related to this PR), I don't think we want to Error all DPA if we cannot get self pod name, we should probably just skip.

// If we cannot get cluster agent pod name, just skip the validation logic
if err != nil {
return fmt.Errorf("Unable to get the cluster agent pod name: %w", err)
return nil
}

var resourceName string
switch owner := podAutoscaler.Spec.TargetRef.Kind; owner {
switch owner := podAutoscalerInternal.Spec().TargetRef.Kind; owner {
case "Deployment":
resourceName = kubernetes.ParseDeploymentForPodName(clusterAgentPodName)
case "ReplicaSet":
Expand All @@ -424,7 +442,7 @@ func (c *Controller) validateAutoscaler(podAutoscaler *datadoghq.DatadogPodAutos

clusterAgentNs := common.GetMyNamespace()

if podAutoscaler.Namespace == clusterAgentNs && podAutoscaler.Spec.TargetRef.Name == resourceName {
if podAutoscalerInternal.Namespace() == clusterAgentNs && podAutoscalerInternal.Spec().TargetRef.Name == resourceName {
return fmt.Errorf("Autoscaling target cannot be set to the cluster agent")
}
return nil
Expand Down
Loading
Loading