From 72fadb7c1dbe7d660c2900527c34f53cfcfce7ac Mon Sep 17 00:00:00 2001 From: Shyunn <114235843+ShyunnY@users.noreply.github.com> Date: Tue, 27 Feb 2024 05:53:08 +0800 Subject: [PATCH] feat: add egctl ratelimit config support (#2674) * feat: add egctl ratelimit config support Signed-off-by: ShyunnY <1147212064@qq.com> * fix: add todo information and supplementary function comments Signed-off-by: ShyunnY <1147212064@qq.com> * fix: pkg sort and refactor check status by rate limit pod Signed-off-by: ShyunnY <1147212064@qq.com> --------- Signed-off-by: ShyunnY <1147212064@qq.com> --- internal/cmd/egctl/config.go | 11 +- internal/cmd/egctl/config_cmd.go | 5 + internal/cmd/egctl/config_ratelimit.go | 226 +++++++++++++++ internal/cmd/egctl/config_test.go | 271 ++++++++++++++++++ internal/cmd/egctl/envoy_stats.go | 4 +- .../kubernetes/ratelimit/resource.go | 15 + .../ratelimit/resource_provider_test.go | 26 ++ site/content/en/latest/design/egctl.md | 4 +- 8 files changed, 553 insertions(+), 9 deletions(-) create mode 100644 internal/cmd/egctl/config_ratelimit.go diff --git a/internal/cmd/egctl/config.go b/internal/cmd/egctl/config.go index ca8a617473e..67ca0526443 100644 --- a/internal/cmd/egctl/config.go +++ b/internal/cmd/egctl/config.go @@ -37,8 +37,9 @@ var ( ) const ( - adminPort = 19000 // TODO: make this configurable until EG support - containerName = "envoy" // TODO: make this configurable until EG support + adminPort = 19000 // TODO: make this configurable until EG support + rateLimitDebugPort = 6070 // TODO: make this configurable until EG support + containerName = "envoy" // TODO: make this configurable until EG support ) type aggregatedConfigDump map[string]map[string]protoreflect.ProtoMessage @@ -80,7 +81,7 @@ func retrieveConfigDump(args []string, includeEds bool, configType envoyConfigTy for _, pod := range pods { pod := pod go func() { - fw, err := portForwarder(cli, pod) + fw, err := portForwarder(cli, pod, adminPort) if err != nil { errs = errors.Join(errs, err) return @@ -182,8 +183,8 @@ func fetchRunningEnvoyPods(c kube.CLIClient, nn types.NamespacedName, labelSelec } // portForwarder returns a port forwarder instance for a single Pod. -func portForwarder(cli kube.CLIClient, nn types.NamespacedName) (kube.PortForwarder, error) { - fw, err := kube.NewLocalPortForwarder(cli, nn, 0, adminPort) +func portForwarder(cli kube.CLIClient, nn types.NamespacedName, port int) (kube.PortForwarder, error) { + fw, err := kube.NewLocalPortForwarder(cli, nn, 0, port) if err != nil { return nil, err } diff --git a/internal/cmd/egctl/config_cmd.go b/internal/cmd/egctl/config_cmd.go index 3fd1ea02e0b..242706f6d37 100644 --- a/internal/cmd/egctl/config_cmd.go +++ b/internal/cmd/egctl/config_cmd.go @@ -23,6 +23,7 @@ func newConfigCommand() *cobra.Command { } cfgCommand.AddCommand(proxyCommand()) + cfgCommand.AddCommand(ratelimitCommand()) flags := cfgCommand.Flags() options.AddKubeConfigFlags(flags) @@ -35,6 +36,10 @@ func newConfigCommand() *cobra.Command { return cfgCommand } +func ratelimitCommand() *cobra.Command { + return ratelimitConfigCommand() +} + func proxyCommand() *cobra.Command { c := &cobra.Command{ Use: "envoy-proxy", diff --git a/internal/cmd/egctl/config_ratelimit.go b/internal/cmd/egctl/config_ratelimit.go new file mode 100644 index 00000000000..376ff65cd06 --- /dev/null +++ b/internal/cmd/egctl/config_ratelimit.go @@ -0,0 +1,226 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package egctl + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + + "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/types" + cmdutil "k8s.io/kubectl/pkg/cmd/util" + + "github.com/envoyproxy/gateway/api/v1alpha1" + "github.com/envoyproxy/gateway/internal/envoygateway" + "github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/ratelimit" + "github.com/envoyproxy/gateway/internal/kubernetes" +) + +var ( + defaultRateLimitNamespace = "envoy-gateway-system" // TODO: make this configurable until EG support + defaultConfigMap = "envoy-gateway-config" // TODO: make this configurable until EG support + defaultConfigMapKey = "envoy-gateway.yaml" // TODO: make this configurable until EG support +) + +func ratelimitConfigCommand() *cobra.Command { + + var ( + namespace string + ) + + rlConfigCmd := &cobra.Command{ + Use: "envoy-ratelimit", + Aliases: []string{"rl"}, + Long: `Retrieve the relevant rate limit configuration from the Rate Limit instance`, + Example: ` # Retrieve rate limit configuration + egctl config envoy-ratelimit + + # Retrieve rate limit configuration with short syntax + egctl c rl +`, + Run: func(c *cobra.Command, args []string) { + cmdutil.CheckErr(runRateLimitConfig(c, namespace)) + }, + } + + rlConfigCmd.Flags().StringVarP(&namespace, "namespace", "n", defaultRateLimitNamespace, "Specific a namespace to get resources") + return rlConfigCmd +} + +func runRateLimitConfig(c *cobra.Command, ns string) error { + + cli, err := getCLIClient() + if err != nil { + return err + } + + out, err := retrieveRateLimitConfig(cli, ns) + if err != nil { + return err + } + + _, err = fmt.Fprintln(c.OutOrStdout(), string(out)) + return err +} + +func retrieveRateLimitConfig(cli kubernetes.CLIClient, ns string) ([]byte, error) { + + // Before retrieving the rate limit configuration + // we make sure that the global rate limit feature is enabled + if enable, err := checkEnableGlobalRateLimit(cli); !enable { + return nil, fmt.Errorf("global rate limit feature is not enabled") + } else if err != nil { + return nil, fmt.Errorf("failed to get global rate limit status: %w", err) + } + + // Filter out all rate limit pods in the Running state + rlNN, err := fetchRunningRateLimitPods(cli, ns, ratelimit.LabelSelector()) + if err != nil { + return nil, err + } + + // In fact, the configuration of multiple rate limit replicas are the same. + // After we filter out the rate limit Pods in the Running state, + // we can directly use the first pod. + rlPod := rlNN[0] + fw, err := portForwarder(cli, rlPod, rateLimitDebugPort) + if err != nil { + return nil, fmt.Errorf("failed to initialize pod-forwarding for %s/%s: %w", rlPod.Namespace, rlPod.Name, err) + } + + return extractRateLimitConfig(fw, rlPod) +} + +// fetchRunningRateLimitPods gets the rate limit Pods, based on the labelSelectors. +// It further filters out only those rate limit Pods that are in "Running" state. +func fetchRunningRateLimitPods(cli kubernetes.CLIClient, namespace string, labelSelector []string) ([]types.NamespacedName, error) { + + // Since multiple replicas of the rate limit are configured to be equal, + // we do not need to use the pod name to obtain the specified pod. + rlPods, err := cli.PodsForSelector(namespace, labelSelector...) + if err != nil { + return nil, err + } + + rlNN := []types.NamespacedName{} + for _, rlPod := range rlPods.Items { + rlPodNsName := types.NamespacedName{ + Namespace: rlPod.Namespace, + Name: rlPod.Name, + } + + // Check that the rate limit pod is ready properly and can accept external traffic + if !checkRateLimitPodStatusReady(rlPod.Status) { + continue + } + + rlNN = append(rlNN, rlPodNsName) + } + if len(rlNN) == 0 { + return nil, fmt.Errorf("please check that the rate limit instance starts properly") + } + + return rlNN, nil +} + +// checkRateLimitPodStatusReady Check that the rate limit pod is ready +func checkRateLimitPodStatusReady(status corev1.PodStatus) bool { + + if status.Phase != corev1.PodRunning { + return false + } + + for _, condition := range status.Conditions { + if condition.Type == corev1.PodReady && + condition.Status == corev1.ConditionTrue { + return true + } + } + + return false +} + +// extractRateLimitConfig After turning on port forwarding through PortForwarder, +// construct a request and send it to the rate limit Pod to obtain relevant configuration information. +func extractRateLimitConfig(fw kubernetes.PortForwarder, rlPod types.NamespacedName) ([]byte, error) { + + if err := fw.Start(); err != nil { + return nil, fmt.Errorf("failed to start port forwarding for pod %s/%s: %w", rlPod.Namespace, rlPod.Name, err) + } + defer fw.Stop() + + out, err := rateLimitConfigRequest(fw.Address()) + if err != nil { + return nil, fmt.Errorf("failed to send request to get rate config for pod %s/%s: %w", rlPod.Namespace, rlPod.Name, err) + } + + return out, nil +} + +// checkEnableGlobalRateLimit Check whether the Global Rate Limit function is enabled +func checkEnableGlobalRateLimit(cli kubernetes.CLIClient) (bool, error) { + + kubeCli := cli.Kube() + cm, err := kubeCli.CoreV1(). + ConfigMaps(defaultRateLimitNamespace). + Get(context.TODO(), defaultConfigMap, metav1.GetOptions{}) + if err != nil { + return false, err + } + + config, ok := cm.Data[defaultConfigMapKey] + if !ok { + return false, fmt.Errorf("failed to get envoy-gateway configuration") + } + + decoder := serializer.NewCodecFactory(envoygateway.GetScheme()).UniversalDeserializer() + obj, gvk, err := decoder.Decode([]byte(config), nil, nil) + if err != nil { + return false, err + } + + if gvk.Group != v1alpha1.GroupVersion.Group || + gvk.Version != v1alpha1.GroupVersion.Version || + gvk.Kind != v1alpha1.KindEnvoyGateway { + return false, errors.New("failed to decode unmatched resource type") + } + + eg, ok := obj.(*v1alpha1.EnvoyGateway) + if !ok { + return false, errors.New("failed to convert object to EnvoyGateway type") + } + + if eg.RateLimit == nil || eg.RateLimit.Backend.Redis == nil { + return false, nil + } + + return true, nil +} + +func rateLimitConfigRequest(address string) ([]byte, error) { + url := fmt.Sprintf("http://%s/rlconfig", address) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer func() { + _ = resp.Body.Close() + }() + + return io.ReadAll(resp.Body) +} diff --git a/internal/cmd/egctl/config_test.go b/internal/cmd/egctl/config_test.go index bfeba9fa155..ff1ad2f8ace 100644 --- a/internal/cmd/egctl/config_test.go +++ b/internal/cmd/egctl/config_test.go @@ -18,7 +18,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/reflect/protoreflect" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/rest" + "github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/ratelimit" kube "github.com/envoyproxy/gateway/internal/kubernetes" "github.com/envoyproxy/gateway/internal/utils/file" netutil "github.com/envoyproxy/gateway/internal/utils/net" @@ -269,3 +276,267 @@ func sampleAggregatedConfigDump(configDump protoreflect.ProtoMessage) aggregated }, } } + +type fakeCLIClient struct { + pods []corev1.Pod + cm *corev1.ConfigMap +} + +func (f *fakeCLIClient) RESTConfig() *rest.Config { + return nil +} + +func (f *fakeCLIClient) Pod(types.NamespacedName) (*corev1.Pod, error) { + return nil, nil +} + +func (f *fakeCLIClient) PodsForSelector(string, ...string) (*corev1.PodList, error) { + return &corev1.PodList{Items: f.pods}, nil +} + +func (f *fakeCLIClient) PodExec(types.NamespacedName, string, string) (stdout string, stderr string, err error) { + return "", "", nil +} + +func (f *fakeCLIClient) Kube() kubernetes.Interface { + return fake.NewSimpleClientset(f.cm) +} + +func TestFetchRunningRateLimitPods(t *testing.T) { + + cases := []struct { + caseName string + rlPods []corev1.Pod + namespace string + labelSelector []string + expectErr error + }{ + { + caseName: "normally obtain the rate limit pod of Running phase", + rlPods: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "envoy-ratelimit-666457bc4c-c2td5", + Namespace: "envoy-gateway-system", + Labels: map[string]string{ + "app.kubernetes.io/name": "envoy-ratelimit", + "app.kubernetes.io/component": "ratelimit", + "app.kubernetes.io/managed-by": "envoy-gateway", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.ContainersReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + }, + namespace: "envoy-gateway-system", + labelSelector: ratelimit.LabelSelector(), + expectErr: nil, + }, + { + caseName: "unable to obtain rate limit pod", + rlPods: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "envoy-ratelimit-666457bc4c-c2td5", + Namespace: "envoy-gateway-system", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + }, + }, + }, + namespace: "envoy-gateway-system", + labelSelector: ratelimit.LabelSelector(), + expectErr: fmt.Errorf("please check that the rate limit instance starts properly"), + }, + } + + for _, tc := range cases { + + t.Run(tc.caseName, func(t *testing.T) { + + fakeCli := &fakeCLIClient{ + pods: tc.rlPods, + } + + _, err := fetchRunningRateLimitPods(fakeCli, tc.namespace, tc.labelSelector) + require.Equal(t, tc.expectErr, err) + + }) + + } +} + +func TestCheckEnableGlobalRateLimit(t *testing.T) { + + cases := []struct { + caseName string + egConfigMap *corev1.ConfigMap + expect bool + }{ + { + caseName: "global rate limit feature is enabled", + expect: true, + egConfigMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "envoy-gateway-config", + Namespace: "envoy-gateway-system", + }, + Data: map[string]string{ + "envoy-gateway.yaml": ` +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: EnvoyGateway +provider: + type: Kubernetes +gateway: + controllerName: gateway.envoyproxy.io/gatewayclass-controller +rateLimit: + backend: + type: Redis + redis: + url: redis.redis-system.svc.cluster.local:6379 +`, + }, + }, + }, + { + caseName: "global rate limit feature is not enabled", + expect: false, + egConfigMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "envoy-gateway-config", + Namespace: "envoy-gateway-system", + }, + Data: map[string]string{ + "envoy-gateway.yaml": ` +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: EnvoyGateway +provider: + type: Kubernetes +gateway: + controllerName: gateway.envoyproxy.io/gatewayclass-controller +`, + }, + }, + }, + } + + for _, tc := range cases { + + t.Run(tc.caseName, func(t *testing.T) { + + fakeCli := &fakeCLIClient{ + cm: tc.egConfigMap, + } + + actual, err := checkEnableGlobalRateLimit(fakeCli) + require.Equal(t, tc.expect, actual) + require.NoError(t, err) + + }) + + } +} + +func TestExtractRateLimitConfig(t *testing.T) { + + cases := []struct { + caseName string + responseBody []byte + rlPod types.NamespacedName + }{ + { + caseName: "rate limit configuration is extract normally", + responseBody: []byte("default/eg/http.httproute/default/backend/rule/0/match/0/*-key-rule-0-match-0_httproute/default/backend/rule/0/match/0/*-value-rule-0-match-0: unit=HOUR requests_per_unit=3, shadow_mode: false"), + rlPod: types.NamespacedName{ + Name: "envoy-ratelimit-666457bc4c-c2td5", + Namespace: "envoy-gateway-system", + }, + }, + } + + for _, tc := range cases { + + t.Run(tc.caseName, func(t *testing.T) { + + fw, err := newFakePortForwarder(tc.responseBody) + require.NoError(t, err) + + out, err := extractRateLimitConfig(fw, tc.rlPod) + require.NoError(t, err) + require.NotEmpty(t, out) + + }) + + } +} + +func TestCheckRateLimitPodStatusReady(t *testing.T) { + + cases := []struct { + caseName string + status corev1.PodStatus + expect bool + }{ + { + caseName: "rate limit pod is ready", + expect: true, + status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.ContainersReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + { + caseName: "rate limit pod is not ready", + expect: false, + status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + { + Type: corev1.ContainersReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + { + caseName: "rate limit pod is running failed", + expect: false, + status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.caseName, func(t *testing.T) { + actual := checkRateLimitPodStatusReady(tc.status) + require.Equal(t, tc.expect, actual) + }) + } + +} diff --git a/internal/cmd/egctl/envoy_stats.go b/internal/cmd/egctl/envoy_stats.go index 527529c9cbf..c7701da0bcd 100644 --- a/internal/cmd/egctl/envoy_stats.go +++ b/internal/cmd/egctl/envoy_stats.go @@ -146,7 +146,7 @@ func setupEnvoyServerStatsConfig(kubeClient kubernetes.CLIClient, podName, podNa path += "/prometheus" } - fw, err := portForwarder(kubeClient, types.NamespacedName{Namespace: podNamespace, Name: podName}) + fw, err := portForwarder(kubeClient, types.NamespacedName{Namespace: podNamespace, Name: podName}, adminPort) if err != nil { return "", fmt.Errorf("failed to initialize pod-forwarding for %s/%s: %w", podNamespace, podName, err) } @@ -169,7 +169,7 @@ func setupEnvoyClusterStatsConfig(kubeClient kubernetes.CLIClient, podName, podN // for yaml output we will convert the json to yaml when printed path += "?format=json" } - fw, err := portForwarder(kubeClient, types.NamespacedName{Namespace: podNamespace, Name: podName}) + fw, err := portForwarder(kubeClient, types.NamespacedName{Namespace: podNamespace, Name: podName}, adminPort) if err != nil { return "", fmt.Errorf("failed to initialize pod-forwarding for %s/%s: %w", podNamespace, podName, err) } diff --git a/internal/infrastructure/kubernetes/ratelimit/resource.go b/internal/infrastructure/kubernetes/ratelimit/resource.go index 95631e41ab8..9003ad4216a 100644 --- a/internal/infrastructure/kubernetes/ratelimit/resource.go +++ b/internal/infrastructure/kubernetes/ratelimit/resource.go @@ -10,6 +10,7 @@ import ( "fmt" "net" "strconv" + "strings" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -98,6 +99,20 @@ func GetServiceURL(namespace string, dnsDomain string) string { return fmt.Sprintf("grpc://%s.%s.svc.%s:%d", InfraName, namespace, dnsDomain, InfraGRPCPort) } +// LabelSelector returns the string slice form labels used for all envoy rate limit resources. +func LabelSelector() []string { + + rlLabelMap := rateLimitLabels() + retLabels := make([]string, 0, len(rlLabelMap)) + + for labelK, labelV := range rlLabelMap { + ls := strings.Join([]string{labelK, labelV}, "=") + retLabels = append(retLabels, ls) + } + + return retLabels +} + // rateLimitLabels returns the labels used for all envoy rate limit resources. func rateLimitLabels() map[string]string { return map[string]string{ diff --git a/internal/infrastructure/kubernetes/ratelimit/resource_provider_test.go b/internal/infrastructure/kubernetes/ratelimit/resource_provider_test.go index 13621851a32..2b8d774d14c 100644 --- a/internal/infrastructure/kubernetes/ratelimit/resource_provider_test.go +++ b/internal/infrastructure/kubernetes/ratelimit/resource_provider_test.go @@ -37,6 +37,32 @@ var ownerReferenceUID = map[string]types.UID{ ResourceKindServiceAccount: "test-owner-reference-uid-for-service-account", } +func TestRateLimitLabelSelector(t *testing.T) { + + cases := []struct { + name string + expected []string + }{ + { + name: "rateLimit-labelSelector", + expected: []string{ + "app.kubernetes.io/name=envoy-ratelimit", + "app.kubernetes.io/component=ratelimit", + "app.kubernetes.io/managed-by=envoy-gateway", + }, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + got := LabelSelector() + require.ElementsMatch(t, tc.expected, got) + }) + } + +} + func TestRateLimitLabels(t *testing.T) { cases := []struct { name string diff --git a/site/content/en/latest/design/egctl.md b/site/content/en/latest/design/egctl.md index 0f67d99f100..4bc8876092d 100644 --- a/site/content/en/latest/design/egctl.md +++ b/site/content/en/latest/design/egctl.md @@ -54,6 +54,6 @@ egctl config envoy-proxy all # Retrieve listener information about proxy configuration from envoy egctl config envoy-proxy listener -# Retrieve information about envoy gateway -egctl config envoy-gateway +# Retrieve the relevant rate limit configuration from the Rate Limit instance +egctl config envoy-ratelimit ```