Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clusteragent/autoscaling] Add telemetry metrics for autoscaling controller #28115

Merged
merged 12 commits into from
Aug 9, 2024
3 changes: 2 additions & 1 deletion pkg/clusteragent/autoscaling/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func NewController(
gvr schema.GroupVersionResource,
isLeader func() bool,
observable Observable,
workqueue workqueue.RateLimitingInterface,
) (*Controller, error) {
mainInformer := informer.ForResource(gvr)
c := &Controller{
Expand All @@ -54,7 +55,7 @@ func NewController(
Client: client,
Lister: mainInformer.Lister(),
synced: mainInformer.Informer().HasSynced,
Workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()),
Workqueue: workqueue,
IsLeader: isLeader,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand Down Expand Up @@ -81,6 +82,57 @@ func (p autoscalingValuesProcessor) processValues(values *kubeAutoscaling.Worklo
}

podAutoscaler.UpdateFromValues(scalingValues)

// Emit telemetry for received values
// Target name cannot normally be empty, but we handle it just in case
var targetName string
if podAutoscaler.Spec() != nil {
targetName = podAutoscaler.Spec().TargetRef.Name
}

// Horizontal value
if scalingValues.Horizontal != nil {
telemetryHorizontalScaleReceivedRecommendations.Set(
float64(scalingValues.Horizontal.Replicas),
podAutoscaler.Namespace(),
targetName,
podAutoscaler.Name(),
string(scalingValues.Horizontal.Source),
le.JoinLeaderValue,
)
}

// Vertical values
if scalingValues.Vertical != nil {
for _, containerResources := range scalingValues.Vertical.ContainerResources {
for resource, value := range containerResources.Requests {
telemetryVerticalScaleReceivedRecommendationsRequests.Set(
value.AsApproximateFloat64(),
podAutoscaler.Namespace(),
targetName,
podAutoscaler.Name(),
string(scalingValues.Vertical.Source),
containerResources.Name,
string(resource),
le.JoinLeaderValue,
)
}

for resource, value := range containerResources.Limits {
telemetryVerticalScaleReceivedRecommendationsLimits.Set(
value.AsApproximateFloat64(),
podAutoscaler.Namespace(),
targetName,
podAutoscaler.Name(),
string(scalingValues.Vertical.Source),
containerResources.Name,
string(resource),
le.JoinLeaderValue,
)
}
}
}

return nil
}

Expand Down
14 changes: 13 additions & 1 deletion pkg/clusteragent/autoscaling/workload/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/client-go/dynamic/dynamicinformer"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"

datadoghq "github.com/DataDog/datadog-operator/apis/datadoghq/v1alpha1"
Expand Down Expand Up @@ -82,7 +83,15 @@ func newController(
localSender: localSender,
}

baseController, err := autoscaling.NewController(controllerID, c, dynamicClient, dynamicInformer, podAutoscalerGVR, isLeader, store)
autoscalingWorkqueue := workqueue.NewRateLimitingQueueWithConfig(
workqueue.DefaultItemBasedRateLimiter(),
workqueue.RateLimitingQueueConfig{
Name: subsystem,
MetricsProvider: autoscalingQueueMetricsProvider,
},
)

baseController, err := autoscaling.NewController(controllerID, c, dynamicClient, dynamicInformer, podAutoscalerGVR, isLeader, store, autoscalingWorkqueue)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -308,6 +317,7 @@ func (c *Controller) createPodAutoscaler(ctx context.Context, podAutoscalerInter
Spec: *podAutoscalerInternal.Spec().DeepCopy(),
Status: podAutoscalerInternal.BuildStatus(metav1.NewTime(c.clock.Now()), nil),
}
trackPodAutoscalerStatus(autoscalerObj)

obj, err := autoscaling.ToUnstructured(autoscalerObj)
if err != nil {
Expand Down Expand Up @@ -345,6 +355,7 @@ func (c *Controller) updatePodAutoscalerSpec(ctx context.Context, podAutoscalerI

func (c *Controller) updatePodAutoscalerStatus(ctx context.Context, podAutoscalerInternal model.PodAutoscalerInternal, podAutoscaler *datadoghq.DatadogPodAutoscaler) error {
newStatus := podAutoscalerInternal.BuildStatus(metav1.NewTime(c.clock.Now()), &podAutoscaler.Status)

if autoscaling.Semantic.DeepEqual(podAutoscaler.Status, newStatus) {
return nil
}
Expand All @@ -355,6 +366,7 @@ func (c *Controller) updatePodAutoscalerStatus(ctx context.Context, podAutoscale
ObjectMeta: podAutoscaler.ObjectMeta,
Status: newStatus,
}
trackPodAutoscalerStatus(autoscalerObj)

obj, err := autoscaling.ToUnstructured(autoscalerObj)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/clusteragent/autoscaling/workload/controller_horizontal.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/pointer"
)
Expand Down Expand Up @@ -134,9 +135,21 @@ func (hr *horizontalController) performScaling(ctx context.Context, podAutoscale
err = fmt.Errorf("failed to scale target: %s/%s to %d replicas, err: %w", scale.Namespace, scale.Name, horizontalAction.ToReplicas, err)
hr.eventRecorder.Event(podAutoscaler, corev1.EventTypeWarning, model.FailedScaleEventReason, err.Error())
autoscalerInternal.UpdateFromHorizontalAction(nil, err)

telemetryHorizontalScaleActions.Inc(scale.Namespace, scale.Name, podAutoscaler.Name, string(scalingValues.Horizontal.Source), "error", le.JoinLeaderValue)
return autoscaling.Requeue, err
}

telemetryHorizontalScaleActions.Inc(scale.Namespace, scale.Name, podAutoscaler.Name, string(scalingValues.Horizontal.Source), "ok", le.JoinLeaderValue)
telemetryHorizontalScaleAppliedRecommendations.Set(
float64(horizontalAction.ToReplicas),
scale.Namespace,
scale.Name,
podAutoscaler.Name,
string(scalingValues.Horizontal.Source),
le.JoinLeaderValue,
)

log.Debugf("Scaled target: %s/%s from %d replicas to %d replicas", scale.Namespace, scale.Name, horizontalAction.FromReplicas, horizontalAction.ToReplicas)
autoscalerInternal.UpdateFromHorizontalAction(horizontalAction, nil)
hr.eventRecorder.Eventf(podAutoscaler, corev1.EventTypeNormal, model.SuccessfulScaleEventReason, "Scaled target: %s/%s from %d replicas to %d replicas", scale.Namespace, scale.Name, horizontalAction.FromReplicas, horizontalAction.ToReplicas)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/workload/model"
k8sutil "github.com/DataDog/datadog-agent/pkg/util/kubernetes"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand Down Expand Up @@ -179,7 +180,7 @@ func (u *verticalController) syncDeploymentKind(
_, err = u.dynamicClient.Resource(gvr).Namespace(target.Namespace).Patch(ctx, target.Name, types.StrategicMergePatchType, patchData, metav1.PatchOptions{})
if err != nil {
err = fmt.Errorf("failed to trigger rollout for gvk: %s, name: %s, err: %v", targetGVK.String(), autoscalerInternal.Spec().TargetRef.Name, err)
rolloutTriggered.Inc(target.Kind, target.Name, target.Namespace, "error")
telemetryVerticalRolloutTriggered.Inc(target.Namespace, target.Name, autoscalerInternal.Name(), "error", le.JoinLeaderValue)
autoscalerInternal.UpdateFromVerticalAction(nil, err)
u.eventRecorder.Event(podAutoscaler, corev1.EventTypeWarning, model.FailedTriggerRolloutEventReason, err.Error())

Expand All @@ -188,7 +189,7 @@ func (u *verticalController) syncDeploymentKind(

// Propagating information about the rollout
log.Infof("Successfully triggered rollout for autoscaler: %s, gvk: %s, name: %s", autoscalerInternal.ID(), targetGVK.String(), autoscalerInternal.Spec().TargetRef.Name)
rolloutTriggered.Inc(target.Kind, target.Name, target.Namespace, "success")
telemetryVerticalRolloutTriggered.Inc(target.Namespace, target.Name, autoscalerInternal.Name(), "ok", le.JoinLeaderValue)
u.eventRecorder.Eventf(podAutoscaler, corev1.EventTypeNormal, model.SuccessfulTriggerRolloutEventReason, "Successfully triggered rollout on target:%s/%s", targetGVK.String(), autoscalerInternal.Spec().TargetRef.Name)

autoscalerInternal.UpdateFromVerticalAction(&datadoghq.DatadogPodAutoscalerVerticalAction{
Expand Down
84 changes: 75 additions & 9 deletions pkg/clusteragent/autoscaling/workload/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,90 @@ import (

"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
"github.com/DataDog/datadog-agent/pkg/telemetry"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
workqueuetelemetry "github.com/DataDog/datadog-agent/pkg/util/workqueue/telemetry"
datadoghq "github.com/DataDog/datadog-operator/apis/datadoghq/v1alpha1"
corev1 "k8s.io/api/core/v1"
)

const (
subsystem = "workload_autoscaling"
subsystem = "autoscaling_workload"
aliveTelemetryInterval = 5 * time.Minute
)

var commonOpts = telemetry.Options{NoDoubleUnderscoreSep: true}
var (
autoscalingQueueMetricsProvider = workqueuetelemetry.NewQueueMetricsProvider()
commonOpts = telemetry.Options{NoDoubleUnderscoreSep: true}

// rolloutTriggered tracks the number of patch requests sent by the patcher to the kubernetes api server
var rolloutTriggered = telemetry.NewCounterWithOpts(
subsystem,
"rollout_triggered",
[]string{"owner_kind", "owner_name", "namespace", "status"},
"Tracks the number of patch requests sent by the patcher to the kubernetes api server",
commonOpts,
// telemetryHorizontalScaleActions tracks the number of horizontal scaling attempts
telemetryHorizontalScaleActions = telemetry.NewCounterWithOpts(
subsystem,
"horizontal_scaling_actions",
[]string{"namespace", "target_name", "autoscaler_name", "source", "status", le.JoinLeaderLabel},
"Tracks the number of horizontal scale events done",
commonOpts,
)
// telemetryHorizontalScaleReceivedRecommendations tracks the horizontal scaling recommendation values received
telemetryHorizontalScaleReceivedRecommendations = telemetry.NewGaugeWithOpts(
subsystem,
"horizontal_scaling_received_replicas",
[]string{"namespace", "target_name", "autoscaler_name", "source", le.JoinLeaderLabel},
"Tracks the value of replicas applied by the horizontal scaling recommendation",
commonOpts,
)
// telemetryHorizontalScaleAppliedRecommendations tracks the horizontal scaling recommendation values applied
telemetryHorizontalScaleAppliedRecommendations = telemetry.NewGaugeWithOpts(
subsystem,
"horizontal_scaling_applied_replicas",
[]string{"namespace", "target_name", "autoscaler_name", "source", le.JoinLeaderLabel},
"Tracks the value of replicas applied by the horizontal scaling recommendation",
commonOpts,
)

// telemetryVerticalRolloutTriggered tracks the number of patch requests sent by the patcher to the kubernetes api server
telemetryVerticalRolloutTriggered = telemetry.NewCounterWithOpts(
subsystem,
"vertical_rollout_triggered",
[]string{"namespace", "target_name", "autoscaler_name", "status", le.JoinLeaderLabel},
"Tracks the number of patch requests sent by the patcher to the kubernetes api server",
commonOpts,
)
// telemetryVerticalScaleReceivedRecommendationsLimits tracks the vertical scaling recommendation limits received
telemetryVerticalScaleReceivedRecommendationsLimits = telemetry.NewGaugeWithOpts(
subsystem,
"vertical_scaling_received_limits",
[]string{"namespace", "target_name", "autoscaler_name", "source", "container_name", "resource_name", le.JoinLeaderLabel},
"Tracks the value of limits received by the vertical scaling controller",
commonOpts,
)
// telemetryVerticalScaleReceivedRecommendationsRequests tracks the vertical scaling recommendation requests received
telemetryVerticalScaleReceivedRecommendationsRequests = telemetry.NewGaugeWithOpts(
subsystem,
"vertical_scaling_received_requests",
[]string{"namespace", "target_name", "autoscaler_name", "source", "container_name", "resource_name", le.JoinLeaderLabel},
"Tracks the value of requests received by the vertical scaling recommendation",
commonOpts,
)

// autoscalingStatusConditions tracks the changes in autoscaler conditions
autoscalingStatusConditions = telemetry.NewGaugeWithOpts(
subsystem,
"autoscaler_conditions",
[]string{"namespace", "autoscaler_name", "type", le.JoinLeaderLabel},
"Tracks the changes in autoscaler conditions",
telemetry.Options{NoDoubleUnderscoreSep: true},
)
)

func trackPodAutoscalerStatus(podAutoscaler *datadoghq.DatadogPodAutoscaler) {
for _, condition := range podAutoscaler.Status.Conditions {
if condition.Status == corev1.ConditionTrue {
autoscalingStatusConditions.Set(1.0, podAutoscaler.Namespace, podAutoscaler.Name, string(condition.Type), le.JoinLeaderValue)
autoscalingStatusConditions.Set(0.0, podAutoscaler.Namespace, podAutoscaler.Name, string(condition.Type), le.JoinLeaderValue)
}
}
}

func startLocalTelemetry(ctx context.Context, sender sender.Sender, tags []string) {
submit := func() {
sender.Gauge("datadog.cluster_agent.autoscaling.workload.running", 1, "", tags)
Expand Down
21 changes: 10 additions & 11 deletions pkg/util/workqueue/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/telemetry"
)

var (
commonOpts = telemetry.Options{NoDoubleUnderscoreSep: true}
)
var commonOpts = telemetry.Options{NoDoubleUnderscoreSep: true}

type gaugeWrapper struct {
telemetry.Gauge
Expand Down Expand Up @@ -102,13 +100,14 @@ func (q *QueueMetricsProvider) NewDepthMetric(subsystem string) workqueue.GaugeM
subsystem,
"Queue depth",
func(subsystem, name, description string) workqueue.GaugeMetric {
return gaugeWrapper{telemetry.NewGaugeWithOpts(
subsystem,
name,
[]string{},
description,
commonOpts,
),
return gaugeWrapper{
telemetry.NewGaugeWithOpts(
subsystem,
name,
[]string{},
description,
commonOpts,
),
}
},
)
Expand Down Expand Up @@ -146,7 +145,7 @@ func (q *QueueMetricsProvider) NewLatencyMetric(subsystem string) workqueue.Hist
name,
[]string{},
description,
[]float64{1, 15, 60, 120, 600, 1200},
[]float64{.05, .1, .25, .5, 1, 2.5, 5, 10, 60, 300, 600, 1200},
commonOpts,
)}
},
Expand Down
Loading