From 2e63411ae0265ebfa311af6ebe437b87e0127240 Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Thu, 7 Nov 2024 16:32:06 +0100 Subject: [PATCH] [nodeutilization]: prometheus usage client through kubernetes metrics --- .../nodeutilization/lownodeutilization.go | 39 ++++- .../lownodeutilization_test.go | 161 ++++++++++++++++++ .../nodeutilization/nodeutilization.go | 23 ++- .../plugins/nodeutilization/types.go | 5 + .../plugins/nodeutilization/usageclients.go | 144 ++++++++++++++++ .../nodeutilization/usageclients_test.go | 68 ++++++++ 6 files changed, 436 insertions(+), 4 deletions(-) diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization.go b/pkg/framework/plugins/nodeutilization/lownodeutilization.go index c566ebbea9..266a95bbf7 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization.go @@ -18,17 +18,25 @@ package nodeutilization import ( "context" + "crypto/tls" "fmt" + "net" + "net/http" + "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" + "sigs.k8s.io/descheduler/pkg/api" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + + promapi "github.com/prometheus/client_golang/api" + "github.com/prometheus/common/config" ) const LowNodeUtilizationPluginName = "LowNodeUtilization" @@ -44,6 +52,8 @@ type LowNodeUtilization struct { overutilizationCriteria []interface{} resourceNames []v1.ResourceName usageSnapshot usageClient + + promClient promapi.Client } var _ frameworktypes.BalancePlugin = &LowNodeUtilization{} @@ -89,8 +99,35 @@ func NewLowNodeUtilization(args runtime.Object, handle frameworktypes.Handle) (f resourceNames := getResourceNames(lowNodeUtilizationArgsArgs.Thresholds) var usageSnapshot usageClient + var promClient promapi.Client if lowNodeUtilizationArgsArgs.MetricsUtilization.MetricsServer { - usageSnapshot = newActualUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector()) + if lowNodeUtilizationArgsArgs.MetricsUtilization.PrometheusURL != "" { + roundTripper := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + + pClient, err := promapi.NewClient(promapi.Config{ + Address: lowNodeUtilizationArgsArgs.MetricsUtilization.PrometheusURL, + RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(lowNodeUtilizationArgsArgs.MetricsUtilization.PrometheusAuthToken), roundTripper), + }) + if err != nil { + return nil, fmt.Errorf("unable to create a new prom client: %v", err) + } + promClient = pClient + + usageSnapshot = newPrometheusUsageSnapshot(handle.GetPodsAssignedToNodeFunc(), promClient, lowNodeUtilizationArgsArgs.MetricsUtilization.PromQuery) + // reset all resource names to just ResourceMetrics + // TODO(ingvagabund): validate only ResourceMetrics is set when prometheus metrics are enabled + resourceNames = []v1.ResourceName{ResourceMetrics} + } else { + usageSnapshot = newActualUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc(), handle.MetricsCollector()) + } } else { usageSnapshot = newRequestedUsageSnapshot(resourceNames, handle.GetPodsAssignedToNodeFunc()) } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go index 01d7517224..6014e02c29 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization_test.go @@ -38,6 +38,8 @@ import ( frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" "sigs.k8s.io/descheduler/pkg/utils" "sigs.k8s.io/descheduler/test" + + "github.com/prometheus/common/model" ) func TestLowNodeUtilization(t *testing.T) { @@ -1359,3 +1361,162 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) { }) } } + +func withLocalStorage(pod *v1.Pod) { + // A pod with local storage. + test.SetNormalOwnerRef(pod) + pod.Spec.Volumes = []v1.Volume{ + { + Name: "sample", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{Path: "somePath"}, + EmptyDir: &v1.EmptyDirVolumeSource{ + SizeLimit: resource.NewQuantity(int64(10), resource.BinarySI), + }, + }, + }, + } + // A Mirror Pod. + pod.Annotations = test.GetMirrorPodAnnotation() +} + +func withCriticalPod(pod *v1.Pod) { + // A Critical Pod. + test.SetNormalOwnerRef(pod) + pod.Namespace = "kube-system" + priority := utils.SystemCriticalPriority + pod.Spec.Priority = &priority +} + +func TestLowNodeUtilizationWithPrometheusMetrics(t *testing.T) { + n1NodeName := "n1" + n2NodeName := "n2" + n3NodeName := "n3" + + testCases := []struct { + name string + useDeviationThresholds bool + thresholds, targetThresholds api.ResourceThresholds + query string + samples []model.Sample + nodes []*v1.Node + pods []*v1.Pod + expectedPodsEvicted uint + evictedPods []string + evictableNamespaces *api.Namespaces + }{ + { + name: "with instance:node_cpu:rate:sum query", + thresholds: api.ResourceThresholds{ + v1.ResourceName("MetricResource"): 30, + }, + targetThresholds: api.ResourceThresholds{ + v1.ResourceName("MetricResource"): 50, + }, + query: "instance:node_cpu:rate:sum", + samples: []model.Sample{ + sample("instance:node_cpu:rate:sum", n1NodeName, 0.5695757575757561), + sample("instance:node_cpu:rate:sum", n2NodeName, 0.4245454545454522), + sample("instance:node_cpu:rate:sum", n3NodeName, 0.20381818181818104), + }, + nodes: []*v1.Node{ + test.BuildTestNode(n1NodeName, 4000, 3000, 9, nil), + test.BuildTestNode(n2NodeName, 4000, 3000, 10, nil), + test.BuildTestNode(n3NodeName, 4000, 3000, 10, nil), + }, + pods: []*v1.Pod{ + test.BuildTestPod("p1", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p2", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p3", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p4", 400, 0, n1NodeName, test.SetRSOwnerRef), + test.BuildTestPod("p5", 400, 0, n1NodeName, test.SetRSOwnerRef), + // These won't be evicted. + test.BuildTestPod("p6", 400, 0, n1NodeName, test.SetDSOwnerRef), + test.BuildTestPod("p7", 400, 0, n1NodeName, withLocalStorage), + test.BuildTestPod("p8", 400, 0, n1NodeName, withCriticalPod), + test.BuildTestPod("p9", 400, 0, n2NodeName, test.SetRSOwnerRef), + }, + expectedPodsEvicted: 1, + }, + } + + for _, tc := range testCases { + testFnc := func(metricsEnabled bool, expectedPodsEvicted uint) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var objs []runtime.Object + for _, node := range tc.nodes { + objs = append(objs, node) + } + for _, pod := range tc.pods { + objs = append(objs, pod) + } + + fakeClient := fake.NewSimpleClientset(objs...) + + podsForEviction := make(map[string]struct{}) + for _, pod := range tc.evictedPods { + podsForEviction[pod] = struct{}{} + } + + evictionFailed := false + if len(tc.evictedPods) > 0 { + fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.CreateAction) + obj := getAction.GetObject() + if eviction, ok := obj.(*policy.Eviction); ok { + if _, exists := podsForEviction[eviction.Name]; exists { + return true, obj, nil + } + evictionFailed = true + return true, nil, fmt.Errorf("pod %q was unexpectedly evicted", eviction.Name) + } + return true, obj, nil + }) + } + + handle, podEvictor, err := frameworktesting.InitFrameworkHandle(ctx, fakeClient, nil, defaultevictor.DefaultEvictorArgs{NodeFit: true}, nil) + if err != nil { + t.Fatalf("Unable to initialize a framework handle: %v", err) + } + + plugin, err := NewLowNodeUtilization(&LowNodeUtilizationArgs{ + Thresholds: tc.thresholds, + TargetThresholds: tc.targetThresholds, + UseDeviationThresholds: tc.useDeviationThresholds, + EvictableNamespaces: tc.evictableNamespaces, + MetricsUtilization: MetricsUtilization{ + MetricsServer: true, + PrometheusURL: "http://prometheus.example.orgname", + PrometheusAuthToken: "XXXXX", + }, + }, + handle) + if err != nil { + t.Fatalf("Unable to initialize the plugin: %v", err) + } + + pClient := &fakePromClient{ + result: tc.samples, + } + + plugin.(*LowNodeUtilization).usageSnapshot = newPrometheusUsageSnapshot(handle.GetPodsAssignedToNodeFunc(), pClient, tc.query) + status := plugin.(frameworktypes.BalancePlugin).Balance(ctx, tc.nodes) + if status != nil { + t.Fatalf("Balance.err: %v", status.Err) + } + + podsEvicted := podEvictor.TotalEvicted() + if expectedPodsEvicted != podsEvicted { + t.Errorf("Expected %v pods to be evicted but %v got evicted", expectedPodsEvicted, podsEvicted) + } + if evictionFailed { + t.Errorf("Pod evictions failed unexpectedly") + } + } + } + t.Run(tc.name, testFnc(false, tc.expectedPodsEvicted)) + } +} diff --git a/pkg/framework/plugins/nodeutilization/nodeutilization.go b/pkg/framework/plugins/nodeutilization/nodeutilization.go index fb7dd925b0..3df6721c96 100644 --- a/pkg/framework/plugins/nodeutilization/nodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/nodeutilization.go @@ -35,6 +35,8 @@ import ( "sigs.k8s.io/descheduler/pkg/utils" ) +const ResourceMetrics = v1.ResourceName("MetricResource") + // NodeUsage stores a node's info, pods on it, thresholds and its resource usage type NodeUsage struct { node *v1.Node @@ -93,6 +95,8 @@ func getNodeThresholds( if len(node.Status.Allocatable) > 0 { nodeCapacity = node.Status.Allocatable } + // Make ResourceMetrics 100% => 1000 points + nodeCapacity[ResourceMetrics] = *resource.NewQuantity(int64(1000), resource.DecimalSI) nodeThresholdsMap[node.Name] = NodeThresholds{ lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{}, @@ -323,15 +327,27 @@ func evictPods( if !preEvictionFilterWithOptions(pod) { continue } + + // In case podUsage does not support resource counting (e.g. provided metric + // does not quantify pod resource utilization) allow to evict only a single + // pod. It is recommended to run the descheduling cycle more often + // so the plugin can perform more evictions towards the re-distribution. + singleEviction := false podUsage, err := usageSnapshot.podUsage(pod) if err != nil { - klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err) - continue + if _, ok := err.(*notSupportedError); !ok { + klog.Errorf("unable to get pod usage for %v/%v: %v", pod.Namespace, pod.Name, err) + continue + } + singleEviction = true } err = podEvictor.Evict(ctx, pod, evictOptions) if err == nil { klog.V(3).InfoS("Evicted pods", "pod", klog.KObj(pod)) - + if singleEviction { + klog.V(3).InfoS("Currently, only a single pod eviction is allowed") + break + } for name := range totalAvailableUsage { if name == v1.ResourcePods { nodeInfo.usage[name].Sub(*resource.NewQuantity(1, resource.DecimalSI)) @@ -354,6 +370,7 @@ func evictPods( if quantity, exists := nodeInfo.usage[v1.ResourcePods]; exists { keysAndValues = append(keysAndValues, "Pods", quantity.Value()) } + for name := range totalAvailableUsage { if !nodeutil.IsBasicResource(name) { keysAndValues = append(keysAndValues, string(name), totalAvailableUsage[name].Value()) diff --git a/pkg/framework/plugins/nodeutilization/types.go b/pkg/framework/plugins/nodeutilization/types.go index b7e987badf..207549c388 100644 --- a/pkg/framework/plugins/nodeutilization/types.go +++ b/pkg/framework/plugins/nodeutilization/types.go @@ -57,4 +57,9 @@ type MetricsUtilization struct { // metricsServer enables metrics from a kubernetes metrics server. // Please see https://kubernetes-sigs.github.io/metrics-server/ for more. MetricsServer bool `json:"metricsServer,omitempty"` + + PrometheusURL string `json:"prometheusURL,omitempty"` + // TODO(ingvagabund): Get the token from a secret + PrometheusAuthToken string `json:"prometheusAuthToken,omitempty"` + PromQuery string `json:"promQuery,omitempty"` } diff --git a/pkg/framework/plugins/nodeutilization/usageclients.go b/pkg/framework/plugins/nodeutilization/usageclients.go index 9a1306fef8..c7e6f27397 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients.go +++ b/pkg/framework/plugins/nodeutilization/usageclients.go @@ -18,8 +18,15 @@ package nodeutilization import ( "context" + "encoding/json" "fmt" + "net/http" + "net/url" + "time" + promapi "github.com/prometheus/client_golang/api" + promv1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,6 +38,28 @@ import ( "sigs.k8s.io/descheduler/pkg/utils" ) +type UsageClientType int + +const ( + requestedUsageClientType UsageClientType = iota + actualUsageClientType + prometheusUsageClientType +) + +type notSupportedError struct { + usageClientType UsageClientType +} + +func (e notSupportedError) Error() string { + return "maximum number of evicted pods per node reached" +} + +func newNotSupportedError(usageClientType UsageClientType) *notSupportedError { + return ¬SupportedError{ + usageClientType: usageClientType, + } +} + type usageClient interface { nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity nodes() []*v1.Node @@ -209,3 +238,118 @@ func (client *actualUsageClient) capture(nodes []*v1.Node) error { return nil } + +type prometheusUsageClient struct { + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + promClient promapi.Client + promQuery string + + _nodes []*v1.Node + _pods map[string][]*v1.Pod + _nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity +} + +var _ usageClient = &actualUsageClient{} + +func newPrometheusUsageSnapshot( + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc, + promClient promapi.Client, + promQuery string, +) *prometheusUsageClient { + return &prometheusUsageClient{ + getPodsAssignedToNode: getPodsAssignedToNode, + promClient: promClient, + promQuery: promQuery, + } +} + +func (client *prometheusUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity { + return client._nodeUtilization[node] +} + +func (client *prometheusUsageClient) nodes() []*v1.Node { + return client._nodes +} + +func (client *prometheusUsageClient) pods(node string) []*v1.Pod { + return client._pods[node] +} + +func (client *prometheusUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) { + return nil, newNotSupportedError(prometheusUsageClientType) +} + +type fakePromClient struct { + result interface{} +} + +type fakePayload struct { + Status string `json:"status"` + Data queryResult `json:"data"` +} + +type queryResult struct { + Type model.ValueType `json:"resultType"` + Result interface{} `json:"result"` +} + +func (client *fakePromClient) URL(ep string, args map[string]string) *url.URL { + return &url.URL{} +} +func (client *fakePromClient) Do(ctx context.Context, request *http.Request) (*http.Response, []byte, error) { + jsonData, err := json.Marshal(fakePayload{ + Status: "success", + Data: queryResult{ + Type: model.ValVector, + Result: client.result, + }, + }) + + return &http.Response{StatusCode: 200}, jsonData, err +} + +func (client *prometheusUsageClient) resourceNames() []v1.ResourceName { + return []v1.ResourceName{ResourceMetrics} +} + +func (client *prometheusUsageClient) capture(nodes []*v1.Node) error { + client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity) + client._pods = make(map[string][]*v1.Pod) + capturedNodes := []*v1.Node{} + + results, warnings, err := promv1.NewAPI(client.promClient).Query(context.TODO(), client.promQuery, time.Now()) + if err != nil { + return fmt.Errorf("unable to capture prometheus metrics: %v", err) + } + if len(warnings) > 0 { + klog.Infof("prometheus metrics warnings: %v", warnings) + } + + nodeUsages := make(map[string]map[v1.ResourceName]*resource.Quantity) + for _, sample := range results.(model.Vector) { + nodeName := string(sample.Metric["instance"]) + nodeUsages[nodeName] = map[v1.ResourceName]*resource.Quantity{ + v1.ResourceName("MetricResource"): resource.NewQuantity(int64(sample.Value*1000), resource.DecimalSI), + } + } + + for _, node := range nodes { + if _, exists := nodeUsages[node.Name]; !exists { + return fmt.Errorf("unable to find metric entry for %v", node.Name) + } + pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil) + if err != nil { + klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) + continue + } + + // store the snapshot of pods from the same (or the closest) node utilization computation + client._pods[node.Name] = pods + client._nodeUtilization[node.Name] = nodeUsages[node.Name] + capturedNodes = append(capturedNodes, node) + } + + client._nodes = capturedNodes + + return nil +} diff --git a/pkg/framework/plugins/nodeutilization/usageclients_test.go b/pkg/framework/plugins/nodeutilization/usageclients_test.go index a65ca69c4f..af70111118 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients_test.go +++ b/pkg/framework/plugins/nodeutilization/usageclients_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/prometheus/common/model" "k8s.io/client-go/informers" fakeclientset "k8s.io/client-go/kubernetes/fake" "k8s.io/metrics/pkg/apis/metrics/v1beta1" @@ -133,3 +134,70 @@ func TestActualUsageClient(t *testing.T) { metricsClientset, collector, usageSnapshot, nodes, n2.Name, n2metrics, ) } + +func sample(metricName, nodeName string, value float64) model.Sample { + return model.Sample{ + Metric: model.Metric{ + "__name__": model.LabelValue(metricName), + "instance": model.LabelValue(nodeName), + }, + Value: model.SampleValue(value), + Timestamp: 1728991761711, + } +} + +func TestPrometheusUsageClient(t *testing.T) { + n1 := test.BuildTestNode("ip-10-0-17-165.ec2.internal", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("ip-10-0-51-101.ec2.internal", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("ip-10-0-94-25.ec2.internal", 2000, 3000, 10, nil) + + nodes := []*v1.Node{n1, n2, n3} + + p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil) + p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil) + p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil) + p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil) + + pClient := &fakePromClient{ + result: []model.Sample{ + sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 0.20381818181818104), + sample("instance:node_cpu:rate:sum", "ip-10-0-17-165.ec2.internal", 0.4245454545454522), + sample("instance:node_cpu:rate:sum", "ip-10-0-94-25.ec2.internal", 0.5695757575757561), + }, + } + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3) + + ctx := context.TODO() + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) + if err != nil { + t.Fatalf("Build get pods assigned to node function error: %v", err) + } + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + prometheusUsageClient := newPrometheusUsageSnapshot(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum") + err = prometheusUsageClient.capture(nodes) + if err != nil { + t.Fatalf("unable to capture prometheus metrics: %v", err) + } + + for _, node := range nodes { + nodeUtil := prometheusUsageClient.nodeUtilization(node.Name) + fmt.Printf("nodeUtil[%v]: %v\n", node.Name, nodeUtil) + } + + nodeThresholds := NodeThresholds{ + lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{ + v1.ResourceName("MetricResource"): resource.NewQuantity(int64(300), resource.DecimalSI), + }, + highResourceThreshold: map[v1.ResourceName]*resource.Quantity{ + v1.ResourceName("MetricResource"): resource.NewQuantity(int64(500), resource.DecimalSI), + }, + } + + fmt.Printf("nodeThresholds: %#v\n", nodeThresholds) +}