Skip to content

Commit

Permalink
[nodeutilization]: prometheus usage client through kubernetes metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ingvagabund committed Nov 7, 2024
1 parent 476ace2 commit 2e63411
Show file tree
Hide file tree
Showing 6 changed files with 436 additions and 4 deletions.
39 changes: 38 additions & 1 deletion pkg/framework/plugins/nodeutilization/lownodeutilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,25 @@ package nodeutilization

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"

"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"

promapi "github.com/prometheus/client_golang/api"
"github.com/prometheus/common/config"
)

const LowNodeUtilizationPluginName = "LowNodeUtilization"
Expand All @@ -44,6 +52,8 @@ type LowNodeUtilization struct {
overutilizationCriteria []interface{}
resourceNames []v1.ResourceName
usageSnapshot usageClient

promClient promapi.Client
}

var _ frameworktypes.BalancePlugin = &LowNodeUtilization{}
Expand Down Expand Up @@ -89,8 +99,35 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f
resourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds)

var usageSnapshot usageClient
var promClient promapi.Client
if lowNodeUtilizationArgsArgs.MetricsUtilization.MetricsServer {
usageSnapshot = newActualUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector())
if lowNodeUtilizationArgsArgs.MetricsUtilization.PrometheusURL != "" {
roundTripper := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}

pClient, err := promapi.NewClient(promapi.Config{
Address: lowNodeUtilizationArgsArgs.MetricsUtilization.PrometheusURL,
RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(lowNodeUtilizationArgsArgs.MetricsUtilization.PrometheusAuthToken), roundTripper),
})
if err != nil {
return nil, fmt.Errorf("unable to create a new prom client: %v", err)
}
promClient = pClient

usageSnapshot = newPrometheusUsageSnapshot(handle.GetPodsAssignedToNodeFunc(), promClient, lowNodeUtilizationArgsArgs.MetricsUtilization.PromQuery)
// reset all resource names to just ResourceMetrics
// TODO(ingvagabund): validate only ResourceMetrics is set when prometheus metrics are enabled
resourceNames = []v1.ResourceName{ResourceMetrics}
} else {
usageSnapshot = newActualUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector())
}
} else {
usageSnapshot = newRequestedUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc())
}
Expand Down
161 changes: 161 additions & 0 deletions pkg/framework/plugins/nodeutilization/lownodeutilization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/test"

"github.com/prometheus/common/model"
)

func TestLowNodeUtilization(t *testing.T) {
Expand Down Expand Up @@ -1359,3 +1361,162 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
})
}
}

func withLocalStorage(pod *v1.Pod) {
// A pod with local storage.
test.SetNormalOwnerRef(pod)
pod.Spec.Volumes = []v1.Volume{
{
Name: "sample",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{Path: "somePath"},
EmptyDir: &v1.EmptyDirVolumeSource{
SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI),
},
},
},
}
// A Mirror Pod.
pod.Annotations = test.GetMirrorPodAnnotation()
}

func withCriticalPod(pod *v1.Pod) {
// A Critical Pod.
test.SetNormalOwnerRef(pod)
pod.Namespace = "kube-system"
priority := utils.SystemCriticalPriority
pod.Spec.Priority = &priority
}

func TestLowNodeUtilizationWithPrometheusMetrics(t *testing.T) {
n1NodeName := "n1"
n2NodeName := "n2"
n3NodeName := "n3"

testCases := []struct {
name string
useDeviationThresholds bool
thresholds, targetThresholds api.ResourceThresholds
query string
samples []model.Sample
nodes []*v1.Node
pods []*v1.Pod
expectedPodsEvicted uint
evictedPods []string
evictableNamespaces *api.Namespaces
}{
{
name: "with instance:node_cpu:rate:sum query",
thresholds: api.ResourceThresholds{
v1.ResourceName("MetricResource"): 30,
},
targetThresholds: api.ResourceThresholds{
v1.ResourceName("MetricResource"): 50,
},
query: "instance:node_cpu:rate:sum",
samples: []model.Sample{
sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561),
sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522),
sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104),
},
nodes: []*v1.Node{
test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil),
test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil),
test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil),
},
pods: []*v1.Pod{
test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef),
test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef),
// These won't be evicted.
test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef),
test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage),
test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod),
test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef),
},
expectedPodsEvicted: 1,
},
}

for _, tc := range testCases {
testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var objs []runtime.Object
for _, node := range tc.nodes {
objs = append(objs, node)
}
for _, pod := range tc.pods {
objs = append(objs, pod)
}

fakeClient := fake.NewSimpleClientset(objs...)

podsForEviction := make(map[string]struct{})
for _, pod := range tc.evictedPods {
podsForEviction[pod] = struct{}{}
}

evictionFailed := false
if len(tc.evictedPods) > 0 {
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.CreateAction)
obj := getAction.GetObject()
if eviction, ok := obj.(*policy.Eviction); ok {
if _, exists := podsForEviction[eviction.Name]; exists {
return true, obj, nil
}
evictionFailed = true
return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name)
}
return true, obj, nil
})
}

handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil)
if err != nil {
t.Fatalf("Unable to initialize a framework handle: %v", err)
}

plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{
Thresholds: tc.thresholds,
TargetThresholds: tc.targetThresholds,
UseDeviationThresholds: tc.useDeviationThresholds,
EvictableNamespaces: tc.evictableNamespaces,
MetricsUtilization: MetricsUtilization{
MetricsServer: true,
PrometheusURL: "http://prometheus.example.orgname",
PrometheusAuthToken: "XXXXX",
},
},
handle)
if err != nil {
t.Fatalf("Unable to initialize the plugin: %v", err)
}

pClient := &fakePromClient{
result: tc.samples,
}

plugin.(*LowNodeUtilization).usageSnapshot = newPrometheusUsageSnapshot(handle.GetPodsAssignedToNodeFunc(), pClient, tc.query)
status := plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes)
if status != nil {
t.Fatalf("Balance.err: %v", status.Err)
}

podsEvicted := podEvictor.TotalEvicted()
if expectedPodsEvicted != podsEvicted {
t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted)
}
if evictionFailed {
t.Errorf("Pod evictions failed unexpectedly")
}
}
}
t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted))
}
}
23 changes: 20 additions & 3 deletions pkg/framework/plugins/nodeutilization/nodeutilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"sigs.k8s.io/descheduler/pkg/utils"
)

const ResourceMetrics = v1.ResourceName("MetricResource")

// NodeUsage stores a node's info, pods on it, thresholds and its resource usage
type NodeUsage struct {
node *v1.Node
Expand Down Expand Up @@ -93,6 +95,8 @@ func getNodeThresholds(
if len(node.Status.Allocatable) > 0 {
nodeCapacity = node.Status.Allocatable
}
// Make ResourceMetrics 100% => 1000 points
nodeCapacity[ResourceMetrics] = *resource.NewQuantity(int64(1000), resource.DecimalSI)

nodeThresholdsMap[node.Name] = NodeThresholds{
lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{},
Expand Down Expand Up @@ -323,15 +327,27 @@ func evictPods(
if !preEvictionFilterWithOptions(pod) {
continue
}

// In case podUsage does not support resource counting (e.g. provided metric
// does not quantify pod resource utilization) allow to evict only a single
// pod. It is recommended to run the descheduling cycle more often
// so the plugin can perform more evictions towards the re-distribution.
singleEviction := false
podUsage, err := usageSnapshot.podUsage(pod)
if err != nil {
klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err)
continue
if _, ok := err.(*notSupportedError); !ok {
klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err)
continue
}
singleEviction = true
}
err = podEvictor.Evict(ctx, pod, evictOptions)
if err == nil {
klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod))

if singleEviction {
klog.V(3).InfoS("Currently, only a single pod eviction is allowed")
break
}
for name := range totalAvailableUsage {
if name == v1.ResourcePods {
nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI))
Expand All @@ -354,6 +370,7 @@ func evictPods(
if quantity, exists := nodeInfo.usage[v1.ResourcePods]; exists {
keysAndValues = append(keysAndValues, "Pods", quantity.Value())
}

for name := range totalAvailableUsage {
if !nodeutil.IsBasicResource(name) {
keysAndValues = append(keysAndValues, string(name), totalAvailableUsage[name].Value())
Expand Down
5 changes: 5 additions & 0 deletions pkg/framework/plugins/nodeutilization/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ type MetricsUtilization struct {
// metricsServer enables metrics from a kubernetes metrics server.
// Please see https://kubernetes-sigs.github.io/metrics-server/ for more.
MetricsServer bool `json:"metricsServer,omitempty"`

PrometheusURL string `json:"prometheusURL,omitempty"`
// TODO(ingvagabund): Get the token from a secret
PrometheusAuthToken string `json:"prometheusAuthToken,omitempty"`
PromQuery string `json:"promQuery,omitempty"`
}
Loading

0 comments on commit 2e63411

Please sign in to comment.