Skip to content

Commit

Permalink
[clusteragent/autoscaling] Impose 100 DatadogPodAutoscaler limit in c…
Browse files Browse the repository at this point in the history
…luster agent (#28684)
  • Loading branch information
jennchenn authored Oct 4, 2024
1 parent 4d751dd commit ef648d6
Show file tree
Hide file tree
Showing 6 changed files with 636 additions and 28 deletions.
168 changes: 168 additions & 0 deletions pkg/clusteragent/autoscaling/max_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build kubeapiserver

package autoscaling

import (
"container/heap"
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model"
)

type store = Store[model.PodAutoscalerInternal]

// TimestampKey is a struct that holds a timestamp and key for a `DatadogPodAutoscaler` object
type TimestampKey struct {
Timestamp time.Time
Key string
}

// MaxTimestampKeyHeap is a heap that sorts TimestampKey objects by timestamp in descending order
type MaxTimestampKeyHeap []TimestampKey

// Len returns the length of the heap
func (h MaxTimestampKeyHeap) Len() int { return len(h) }

// Less returns true if the timestamp at index i is after the timestamp at index j
func (h MaxTimestampKeyHeap) Less(i, j int) bool {
return LessThan(h[i], h[j])
}

// LessThan returns true if the timestamp of k1 is after the timestamp of k2
func LessThan(k1, k2 TimestampKey) bool {
return k1.Timestamp.After(k2.Timestamp) || (k1.Timestamp.Equal(k2.Timestamp) && k1.Key < k2.Key)
}

// Swap swaps the elements at indices i and j
func (h MaxTimestampKeyHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

// Push adds an element to the heap while preserving max heap ordering
func (h *MaxTimestampKeyHeap) Push(x interface{}) {
*h = append(*h, x.(TimestampKey))
}

// Pop removes the top element from the heap while preserving max heap ordering
func (h *MaxTimestampKeyHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// Peek returns the top element of the heap without removing it
func (h *MaxTimestampKeyHeap) Peek() TimestampKey {
return (*h)[0]
}

// NewMaxHeap returns a new MaxTimestampKeyHeap
func NewMaxHeap() *MaxTimestampKeyHeap {
h := &MaxTimestampKeyHeap{}
heap.Init(h)
return h
}

// FindIdx returns the index of the given key in the heap and a boolean indicating if the key was found
func (h *MaxTimestampKeyHeap) FindIdx(key string) (int, bool) {
for idx, k := range *h {
if k.Key == key {
return idx, true
}
}
return 0, false
}

// HashHeap is a struct that holds a MaxHeap and a set of keys that exist in the heap
type HashHeap struct {
MaxHeap MaxTimestampKeyHeap
Keys map[string]bool
maxSize int
mu sync.RWMutex
store *store
}

// NewHashHeap returns a new MaxHeap with the given max size
func NewHashHeap(maxSize int, store *store) *HashHeap {
return &HashHeap{
MaxHeap: *NewMaxHeap(),
Keys: make(map[string]bool),
maxSize: maxSize,
mu: sync.RWMutex{},
store: store,
}
}

// InsertIntoHeap returns true if the key already exists in the max heap or was inserted correctly
// Used as an ObserverFunc; accept sender as parameter to match ObserverFunc signature
func (h *HashHeap) InsertIntoHeap(key, _sender string) {
// Already in heap, do not try to insert
if h.Exists(key) {
return
}

// Get object from store
podAutoscalerInternal, podAutoscalerInternalFound := h.store.Get(key)
if !podAutoscalerInternalFound {
return
}

if podAutoscalerInternal.CreationTimestamp().IsZero() {
return
}

newTimestampKey := TimestampKey{
Timestamp: podAutoscalerInternal.CreationTimestamp(),
Key: key,
}

h.mu.Lock()
defer h.mu.Unlock()

if h.MaxHeap.Len() >= h.maxSize {
top := h.MaxHeap.Peek()
// If the new key is newer than or equal to the top key, do not insert
if LessThan(newTimestampKey, top) {
return
}
delete(h.Keys, top.Key)
heap.Pop(&h.MaxHeap)
}

heap.Push(&h.MaxHeap, newTimestampKey)
h.Keys[key] = true
}

// DeleteFromHeap removes the given key from the max heap
// Used as an ObserverFunc; accept sender as parameter to match ObserverFunc signature
func (h *HashHeap) DeleteFromHeap(key, _sender string) {
// Key did not exist in heap, return early
if !h.Exists(key) {
return
}
h.mu.RLock()
idx, found := h.MaxHeap.FindIdx(key)
h.mu.RUnlock()

if !found {
return
}

h.mu.Lock()
defer h.mu.Unlock()
heap.Remove(&h.MaxHeap, idx)
delete(h.Keys, key)
}

// Exists returns true if the given key exists in the heap
func (h *HashHeap) Exists(key string) bool {
h.mu.RLock()
defer h.mu.RUnlock()
_, ok := h.Keys[key]
return ok
}
34 changes: 26 additions & 8 deletions pkg/clusteragent/autoscaling/workload/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Controller struct {
eventRecorder record.EventRecorder
store *store

limitHeap *autoscaling.HashHeap

podWatcher podWatcher
horizontalController *horizontalController
verticalController *verticalController
Expand All @@ -79,6 +81,7 @@ func newController(
store *store,
podWatcher podWatcher,
localSender sender.Sender,
limitHeap *autoscaling.HashHeap,
) (*Controller, error) {
c := &Controller{
clusterID: clusterID,
Expand All @@ -101,6 +104,11 @@ func newController(
}

c.Controller = baseController
c.limitHeap = limitHeap
store.RegisterObserver(autoscaling.Observer{
SetFunc: c.limitHeap.InsertIntoHeap,
DeleteFunc: c.limitHeap.DeleteFromHeap,
})
c.store = store
c.podWatcher = podWatcher

Expand Down Expand Up @@ -193,7 +201,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
// If flagged for deletion, we just need to clear up our store (deletion complete)
// Also if object was not owned by remote config, we also need to delete it (deleted by user)
if podAutoscalerInternal.Deleted() || podAutoscalerInternal.Spec().Owner != datadoghq.DatadogPodAutoscalerRemoteOwner {
log.Infof("Object %s not present in Kuberntes and flagged for deletion (remote) or owner == local, clearing internal store", key)
log.Infof("Object %s not present in Kubernetes and flagged for deletion (remote) or owner == local, clearing internal store", key)
c.store.UnlockDelete(key, c.ID)
return autoscaling.NoRequeue, nil
}
Expand Down Expand Up @@ -233,7 +241,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
*podAutoscalerInternal.Spec().RemoteVersion > *podAutoscaler.Spec.RemoteVersion {
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler)

// When doing an external update, we stop and reqeue the object to not have multiple changes at once.
// When doing an external update, we stop and requeue the object to not have multiple changes at once.
c.store.Unlock(key)
return autoscaling.Requeue, err
}
Expand All @@ -257,12 +265,15 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
if localHash != remoteHash {
err := c.updatePodAutoscalerSpec(ctx, podAutoscalerInternal, podAutoscaler)

// When doing an external update, we stop and reqeue the object to not have multiple changes at once.
// When doing an external update, we stop and requeue the object to not have multiple changes at once.
c.store.Unlock(key)
return autoscaling.Requeue, err
}

podAutoscalerInternal.SetGeneration(podAutoscaler.Generation)
if podAutoscalerInternal.CreationTimestamp().IsZero() {
podAutoscalerInternal.UpdateCreationTimestamp(podAutoscaler.CreationTimestamp.Time)
}
}
}

Expand All @@ -277,7 +288,7 @@ func (c *Controller) syncPodAutoscaler(ctx context.Context, key, ns, name string
podAutoscalerInternal.SetError(nil)

// Validate autoscaler requirements
validationErr := c.validateAutoscaler(podAutoscaler)
validationErr := c.validateAutoscaler(podAutoscalerInternal)
if validationErr != nil {
podAutoscalerInternal.SetError(validationErr)
return autoscaling.NoRequeue, c.updateAutoscalerStatusAndUnlock(ctx, key, ns, name, validationErr, podAutoscalerInternal, podAutoscaler)
Expand Down Expand Up @@ -407,15 +418,22 @@ func (c *Controller) deletePodAutoscaler(ns, name string) error {
return nil
}

func (c *Controller) validateAutoscaler(podAutoscaler *datadoghq.DatadogPodAutoscaler) error {
func (c *Controller) validateAutoscaler(podAutoscalerInternal model.PodAutoscalerInternal) error {
// Check that we are within the limit of 100 DatadogPodAutoscalers
key := podAutoscalerInternal.ID()
if !c.limitHeap.Exists(key) {
return fmt.Errorf("Autoscaler disabled as maximum number per cluster reached (%d)", maxDatadogPodAutoscalerObjects)
}

// Check that targetRef is not set to the cluster agent
clusterAgentPodName, err := common.GetSelfPodName()
// If we cannot get cluster agent pod name, just skip the validation logic
if err != nil {
return fmt.Errorf("Unable to get the cluster agent pod name: %w", err)
return nil
}

var resourceName string
switch owner := podAutoscaler.Spec.TargetRef.Kind; owner {
switch owner := podAutoscalerInternal.Spec().TargetRef.Kind; owner {
case "Deployment":
resourceName = kubernetes.ParseDeploymentForPodName(clusterAgentPodName)
case "ReplicaSet":
Expand All @@ -424,7 +442,7 @@ func (c *Controller) validateAutoscaler(podAutoscaler *datadoghq.DatadogPodAutos

clusterAgentNs := common.GetMyNamespace()

if podAutoscaler.Namespace == clusterAgentNs && podAutoscaler.Spec.TargetRef.Name == resourceName {
if podAutoscalerInternal.Namespace() == clusterAgentNs && podAutoscalerInternal.Spec().TargetRef.Name == resourceName {
return fmt.Errorf("Autoscaling target cannot be set to the cluster agent")
}
return nil
Expand Down
Loading

0 comments on commit ef648d6

Please sign in to comment.