Skip to content

Commit

Permalink
refactor: infra client CreateOrUpdate to ServerSideApply (#3134)
Browse files Browse the repository at this point in the history
* refactor(infra-client): CreateOrUpdate to ServerSideApply

Signed-off-by: Ardika Bagus <me@ardikabs.com>

* test(infra-client): add e2e test for ServerSideApply

Signed-off-by: Ardika Bagus <me@ardikabs.com>

* chore: remove comment

Signed-off-by: Ardika Bagus <me@ardikabs.com>

* chore: fix linter

Signed-off-by: Ardika Bagus <me@ardikabs.com>

---------

Signed-off-by: Ardika Bagus <me@ardikabs.com>
  • Loading branch information
ardikabs authored Apr 9, 2024
1 parent b45c4c4 commit 81108f2
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 85 deletions.
28 changes: 6 additions & 22 deletions internal/infrastructure/kubernetes/infra_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -25,28 +24,13 @@ func New(cli client.Client) *InfraClient {
}
}

func (cli *InfraClient) CreateOrUpdate(ctx context.Context, key client.ObjectKey, current client.Object, specific client.Object, updateChecker func() bool) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := cli.Client.Get(ctx, key, current); err != nil {
if kerrors.IsNotFound(err) {
// Create if it does not exist.
if err := cli.Client.Create(ctx, specific); err != nil {
return fmt.Errorf("for Create: %w", err)
}
}
} else {
// Since the client.Object does not have a specific Spec field to compare
// just perform an update for now.
if updateChecker() {
opts := []client.PatchOption{client.ForceOwnership, client.FieldOwner("envoy-gateway")}
if err := cli.Client.Patch(ctx, specific, client.Apply, opts...); err != nil {
return fmt.Errorf("for Update: %w", err)
}
}
}
func (cli *InfraClient) ServerSideApply(ctx context.Context, obj client.Object) error {
opts := []client.PatchOption{client.ForceOwnership, client.FieldOwner("envoy-gateway")}
if err := cli.Client.Patch(ctx, obj, client.Apply, opts...); err != nil {
return fmt.Errorf("failed to create/update resource with server-side apply for obj %v: %w", obj, err)
}

return nil
})
return nil
}

func (cli *InfraClient) Delete(ctx context.Context, object client.Object) error {
Expand Down
65 changes: 5 additions & 60 deletions internal/infrastructure/kubernetes/infra_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,11 @@ package kubernetes

import (
"context"
"reflect"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/resource"
"github.com/envoyproxy/gateway/internal/utils"
)

// createOrUpdateServiceAccount creates a ServiceAccount in the kube api server based on the
Expand All @@ -29,14 +22,7 @@ func (i *Infra) createOrUpdateServiceAccount(ctx context.Context, r ResourceRend
return err
}

current := &corev1.ServiceAccount{}
key := utils.NamespacedName(sa)

return i.Client.CreateOrUpdate(ctx, key, current, sa, func() bool {
// the service account never changed, does not need to update
// fixes https://github.com/envoyproxy/gateway/issues/1604
return false
})
return i.Client.ServerSideApply(ctx, sa)
}

// createOrUpdateConfigMap creates a ConfigMap in the Kube api server based on the provided
Expand All @@ -50,15 +36,8 @@ func (i *Infra) createOrUpdateConfigMap(ctx context.Context, r ResourceRender) e
if cm == nil {
return nil
}
current := &corev1.ConfigMap{}
key := types.NamespacedName{
Namespace: cm.Namespace,
Name: cm.Name,
}

return i.Client.CreateOrUpdate(ctx, key, current, cm, func() bool {
return !reflect.DeepEqual(cm.Data, current.Data)
})
return i.Client.ServerSideApply(ctx, cm)
}

// createOrUpdateDeployment creates a Deployment in the kube api server based on the provided
Expand All @@ -69,25 +48,7 @@ func (i *Infra) createOrUpdateDeployment(ctx context.Context, r ResourceRender)
return err
}

current := &appsv1.Deployment{}
key := types.NamespacedName{
Namespace: deployment.Namespace,
Name: deployment.Name,
}

hpa, err := r.HorizontalPodAutoscaler()
if err != nil {
return err
}

var opts cmp.Options
if hpa != nil {
opts = append(opts, cmpopts.IgnoreFields(appsv1.DeploymentSpec{}, "Replicas"))
}

return i.Client.CreateOrUpdate(ctx, key, current, deployment, func() bool {
return !cmp.Equal(current.Spec, deployment.Spec, opts...)
})
return i.Client.ServerSideApply(ctx, deployment)
}

// createOrUpdateHPA creates HorizontalPodAutoscaler object in the kube api server based on
Expand All @@ -105,15 +66,7 @@ func (i *Infra) createOrUpdateHPA(ctx context.Context, r ResourceRender) error {
return i.deleteHPA(ctx, r)
}

current := &autoscalingv2.HorizontalPodAutoscaler{}
key := types.NamespacedName{
Namespace: hpa.Namespace,
Name: hpa.Name,
}

return i.Client.CreateOrUpdate(ctx, key, current, hpa, func() bool {
return !cmp.Equal(hpa.Spec, current.Spec)
})
return i.Client.ServerSideApply(ctx, hpa)
}

// createOrUpdateRateLimitService creates a Service in the kube api server based on the provided ResourceRender,
Expand All @@ -124,15 +77,7 @@ func (i *Infra) createOrUpdateService(ctx context.Context, r ResourceRender) err
return err
}

current := &corev1.Service{}
key := types.NamespacedName{
Namespace: svc.Namespace,
Name: svc.Name,
}

return i.Client.CreateOrUpdate(ctx, key, current, svc, func() bool {
return !resource.CompareSvc(svc, current)
})
return i.Client.ServerSideApply(ctx, svc)
}

// deleteServiceAccount deletes the ServiceAccount in the kube api server, if it exists.
Expand Down
6 changes: 5 additions & 1 deletion internal/infrastructure/kubernetes/proxy_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ func TestCreateOrUpdateProxyDeployment(t *testing.T) {
}

func TestDeleteProxyDeployment(t *testing.T) {
cli := fakeclient.NewClientBuilder().WithScheme(envoygateway.GetScheme()).WithObjects().Build()
cli := fakeclient.NewClientBuilder().
WithScheme(envoygateway.GetScheme()).
WithObjects().
WithInterceptorFuncs(interceptorFunc).
Build()
cfg, err := config.New()
require.NoError(t, err)

Expand Down
6 changes: 4 additions & 2 deletions test/cel-validation/clienttrafficpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ package celvalidation
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/resource"
"strings"
"testing"
"time"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"k8s.io/apimachinery/pkg/api/resource"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
gwapiv1a2 "sigs.k8s.io/gateway-api/apis/v1alpha2"

egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
)

func TestClientTrafficPolicyTarget(t *testing.T) {
Expand Down
178 changes: 178 additions & 0 deletions test/e2e/tests/gateway_infra_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright Envoy Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

//go:build e2e
// +build e2e

package tests

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
"sigs.k8s.io/gateway-api/conformance/utils/suite"

"github.com/envoyproxy/gateway/internal/utils"
)

func init() {
ConformanceTests = append(ConformanceTests, GatewayInfraResourceTest)
}

var GatewayInfraResourceTest = suite.ConformanceTest{
ShortName: "GatewayInfraResourceTest",
Description: "Gateway Infra Resource E2E Test",
Test: func(t *testing.T, suite *suite.ConformanceTestSuite) {
gatewayTypeMeta := metav1.TypeMeta{
Kind: "Gateway",
APIVersion: "gateway.networking.k8s.io/v1",
}
gatewayObjMeta := metav1.ObjectMeta{
Name: "e2e-test-infra",
Namespace: "envoy-gateway-system",
}

labelSelector := labels.SelectorFromSet(labels.Set{"gateway.envoyproxy.io/owning-gateway-name": gatewayObjMeta.Name})

var awaitOperation sync.WaitGroup

t.Run("create gateway", func(t *testing.T) {
awaitOperation.Add(1)

newGatewayObj := &gwapiv1.Gateway{
TypeMeta: gatewayTypeMeta,
ObjectMeta: gatewayObjMeta,
Spec: gwapiv1.GatewaySpec{
GatewayClassName: gwapiv1.ObjectName(suite.GatewayClassName),
Listeners: []gwapiv1.Listener{
{
Name: "http",
Port: 8000,
Protocol: "HTTP",
},
{
Name: "my-tcp",
Port: 5432,
Protocol: "TCP",
},
},
},
}

err := suite.Client.Patch(context.TODO(), newGatewayObj, client.Apply, client.ForceOwnership, client.FieldOwner("e2e-test"))
require.NoError(t, err)

<-time.After(time.Millisecond * 300)

var gatewayDeploymentList appsv1.DeploymentList
err = suite.Client.List(context.TODO(), &gatewayDeploymentList, &client.ListOptions{
LabelSelector: labelSelector,
Namespace: gatewayObjMeta.Namespace,
})
require.NoError(t, err)
require.Len(t, gatewayDeploymentList.Items, 1)

awaitOperation.Done()
})

awaitOperation.Wait()
t.Run("update gateway - listener changes", func(t *testing.T) {
awaitOperation.Add(1)

newListenerTCPName := "custom-tcp"
newListenerHTTPPort := int32(8001)

changedGatewayObj := &gwapiv1.Gateway{
TypeMeta: gatewayTypeMeta,
ObjectMeta: gatewayObjMeta,
Spec: gwapiv1.GatewaySpec{
GatewayClassName: gwapiv1.ObjectName(suite.GatewayClassName),
Listeners: []gwapiv1.Listener{
{
Name: "http",
Port: gwapiv1.PortNumber(newListenerHTTPPort),
Protocol: "HTTP",
},
{
Name: gwapiv1.SectionName(newListenerTCPName),
Port: 5432,
Protocol: "TCP",
},
},
},
}

err := suite.Client.Patch(context.TODO(), changedGatewayObj, client.Apply, client.ForceOwnership, client.FieldOwner("e2e-test"))
require.NoError(t, err)

<-time.After(time.Millisecond * 300)
var gatewayDeploymentList appsv1.DeploymentList
err = suite.Client.List(context.TODO(), &gatewayDeploymentList, &client.ListOptions{
LabelSelector: labelSelector,
Namespace: gatewayObjMeta.Namespace,
})
require.NoError(t, err)
require.Len(t, gatewayDeploymentList.Items, 1)

gatewayDeployment := gatewayDeploymentList.Items[0]

for _, container := range gatewayDeployment.Spec.Template.Spec.Containers {
var isTCPPortNameMatch, isHTTPPortNumberMatch bool

hashedPortName := utils.GetHashedName(newListenerTCPName, 6)
if container.Name == "envoy" {
for _, port := range container.Ports {
if port.Name == hashedPortName {
isTCPPortNameMatch = true
}

if port.ContainerPort == newListenerHTTPPort {
isHTTPPortNumberMatch = true
}
}

if !isTCPPortNameMatch {
t.Errorf("container expected TCP port name '%v' is not found", hashedPortName)
}

if !isHTTPPortNumberMatch {
t.Errorf("container expected HTTP port number '%v' is not found", hashedPortName)
}
}
}

awaitOperation.Done()
})

awaitOperation.Wait()
t.Run("delete gateway", func(t *testing.T) {
gwObj := &gwapiv1.Gateway{
TypeMeta: gatewayTypeMeta,
ObjectMeta: gatewayObjMeta,
}

err := suite.Client.Delete(context.TODO(), gwObj)
require.NoError(t, err)

<-time.After(time.Millisecond * 300)

var gatewayDeploymentList appsv1.DeploymentList
err = suite.Client.List(context.TODO(), &gatewayDeploymentList, &client.ListOptions{
LabelSelector: labelSelector,
Namespace: gatewayObjMeta.Namespace,
})
require.NoError(t, err)
require.Empty(t, gatewayDeploymentList.Items)
})
},
}

0 comments on commit 81108f2

Please sign in to comment.