From b9053b8c97ea1850f693ed91c9a07360d28e5de9 Mon Sep 17 00:00:00 2001 From: Alex Volchok Date: Thu, 4 Apr 2024 09:24:31 +0200 Subject: [PATCH] feat(EG K8S Provider): Enable leader election for EG controller (#2694) * merge with main Signed-off-by: Alexander Volchok * fixing lint Signed-off-by: Alexander Volchok * move to leader election to k8s provider Signed-off-by: Alexander Volchok * fix lint Signed-off-by: Alexander Volchok * fix test Signed-off-by: Alexander Volchok * fix settings Signed-off-by: Alexander Volchok * atjust as per code review feedback Signed-off-by: Alexander Volchok * updating Signed-off-by: Alexander Volchok * removing defaults Signed-off-by: Alexander Volchok * fix as per code review feedback Signed-off-by: Alexander Volchok * fixing Signed-off-by: Alexander Volchok * chaging disabled to disable and reverting some changes Signed-off-by: Alexander Volchok * fixing Signed-off-by: Alexander Volchok * fix tests and defaults Signed-off-by: Alexander Volchok * revet values tpl changes Signed-off-by: Alexander Volchok * skip le in testenv Signed-off-by: Alexander Volchok * fix le params Signed-off-by: Alexander Volchok * updating descriptions Signed-off-by: Alexander Volchok * Update values.tmpl.yaml Signed-off-by: Alex Volchok * updating docs Signed-off-by: Alexander Volchok * removig extra comment mark Signed-off-by: Alexander Volchok --------- Signed-off-by: Alexander Volchok Signed-off-by: Alex Volchok --- api/v1alpha1/envoygateway_helpers.go | 27 +++++++ api/v1alpha1/envoygateway_types.go | 19 +++++ .../validation/envoygateway_validate_test.go | 2 +- api/v1alpha1/zz_generated.deepcopy.go | 40 ++++++++++ internal/envoygateway/config/config.go | 5 +- internal/envoygateway/config/decoder_test.go | 24 ++++++ .../decoder/in/gateway-global-ratelimit.yaml | 6 ++ .../decoder/in/gateway-leaderelection.yaml | 13 ++++ .../testdata/decoder/in/kube-provider.yaml | 6 ++ .../decoder/in/provider-mixing-gateway.yaml | 6 ++ .../decoder/in/provider-with-gateway.yaml | 6 ++ internal/infrastructure/runner/runner.go | 29 +++++-- internal/provider/kubernetes/controller.go | 9 +++ internal/provider/kubernetes/kubernetes.go | 53 +++++++++++-- .../provider/kubernetes/kubernetes_test.go | 22 +++--- internal/provider/kubernetes/sources.go | 46 ++++++++++++ internal/provider/kubernetes/sources_test.go | 75 +++++++++++++++++++ site/content/en/latest/api/extension_types.md | 18 +++++ tools/hack/deployment-exists.sh | 24 ++++++ tools/make/kube.mk | 1 + 20 files changed, 406 insertions(+), 25 deletions(-) create mode 100644 internal/envoygateway/config/testdata/decoder/in/gateway-leaderelection.yaml create mode 100644 internal/provider/kubernetes/sources.go create mode 100644 internal/provider/kubernetes/sources_test.go create mode 100755 tools/hack/deployment-exists.sh diff --git a/api/v1alpha1/envoygateway_helpers.go b/api/v1alpha1/envoygateway_helpers.go index 09077991c87..de8ddc98537 100644 --- a/api/v1alpha1/envoygateway_helpers.go +++ b/api/v1alpha1/envoygateway_helpers.go @@ -9,6 +9,8 @@ import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" ) // DefaultEnvoyGateway returns a new EnvoyGateway with default configuration parameters. @@ -39,6 +41,11 @@ func (e *EnvoyGateway) SetEnvoyGatewayDefaults() { if e.Provider == nil { e.Provider = DefaultEnvoyGatewayProvider() } + if e.Provider.Kubernetes == nil { + e.Provider.Kubernetes = &EnvoyGatewayKubernetesProvider{ + LeaderElection: DefaultLeaderElection(), + } + } if e.Gateway == nil { e.Gateway = DefaultGateway() } @@ -85,6 +92,16 @@ func (e *EnvoyGateway) NamespaceMode() bool { len(e.Provider.Kubernetes.Watch.Namespaces) > 0 } +// DefaultLeaderElection returns a new LeaderElection with default configuration parameters. +func DefaultLeaderElection() *LeaderElection { + return &LeaderElection{ + RenewDeadline: ptr.To(gwapiv1.Duration("10s")), + RetryPeriod: ptr.To(gwapiv1.Duration("2s")), + LeaseDuration: ptr.To(gwapiv1.Duration("15s")), + Disable: ptr.To(false), + } +} + // DefaultGateway returns a new Gateway with default configuration parameters. func DefaultGateway() *Gateway { return &Gateway{ @@ -148,6 +165,9 @@ func DefaultEnvoyGatewayPrometheus() *EnvoyGatewayPrometheusProvider { func DefaultEnvoyGatewayProvider() *EnvoyGatewayProvider { return &EnvoyGatewayProvider{ Type: ProviderTypeKubernetes, + Kubernetes: &EnvoyGatewayKubernetesProvider{ + LeaderElection: DefaultLeaderElection(), + }, } } @@ -195,9 +215,16 @@ func (r *EnvoyGatewayProvider) GetEnvoyGatewayKubeProvider() *EnvoyGatewayKubern if r.Kubernetes == nil { r.Kubernetes = DefaultEnvoyGatewayKubeProvider() + if r.Kubernetes.LeaderElection == nil { + r.Kubernetes.LeaderElection = DefaultLeaderElection() + } return r.Kubernetes } + if r.Kubernetes.LeaderElection == nil { + r.Kubernetes.LeaderElection = DefaultLeaderElection() + } + if r.Kubernetes.RateLimitDeployment == nil { r.Kubernetes.RateLimitDeployment = DefaultKubernetesDeployment(DefaultRateLimitImage) } diff --git a/api/v1alpha1/envoygateway_types.go b/api/v1alpha1/envoygateway_types.go index ea9e509d960..47b9861e170 100644 --- a/api/v1alpha1/envoygateway_types.go +++ b/api/v1alpha1/envoygateway_types.go @@ -90,6 +90,21 @@ type EnvoyGatewaySpec struct { ExtensionAPIs *ExtensionAPISettings `json:"extensionApis,omitempty"` } +// LeaderElection defines the desired leader election settings. +type LeaderElection struct { + // LeaseDuration defines the time non-leader contenders will wait before attempting to claim leadership. + // It's based on the timestamp of the last acknowledged signal. The default setting is 15 seconds. + LeaseDuration *gwapiv1.Duration `json:"leaseDuration,omitempty"` + // RenewDeadline represents the time frame within which the current leader will attempt to renew its leadership + // status before relinquishing its position. The default setting is 10 seconds. + RenewDeadline *gwapiv1.Duration `json:"renewDeadline,omitempty"` + // RetryPeriod denotes the interval at which LeaderElector clients should perform action retries. + // The default setting is 2 seconds. + RetryPeriod *gwapiv1.Duration `json:"retryPeriod,omitempty"` + // Disable provides the option to turn off leader election, which is enabled by default. + Disable *bool `json:"disable,omitempty"` +} + // EnvoyGatewayTelemetry defines telemetry configurations for envoy gateway control plane. // Control plane will focus on metrics observability telemetry and tracing telemetry later. type EnvoyGatewayTelemetry struct { @@ -194,6 +209,10 @@ type EnvoyGatewayKubernetesProvider struct { // OverwriteControlPlaneCerts updates the secrets containing the control plane certs, when set. // +optional OverwriteControlPlaneCerts *bool `json:"overwriteControlPlaneCerts,omitempty"` + // LeaderElection specifies the configuration for leader election. + // If it's not set up, leader election will be active by default, using Kubernetes' standard settings. + // +optional + LeaderElection *LeaderElection `json:"leaderElection,omitempty"` } const ( diff --git a/api/v1alpha1/validation/envoygateway_validate_test.go b/api/v1alpha1/validation/envoygateway_validate_test.go index 612353ba88f..6978a52e35f 100644 --- a/api/v1alpha1/validation/envoygateway_validate_test.go +++ b/api/v1alpha1/validation/envoygateway_validate_test.go @@ -646,7 +646,7 @@ func TestEnvoyGatewayProvider(t *testing.T) { assert.NotNil(t, envoyGateway.Provider) envoyGatewayProvider := envoyGateway.GetEnvoyGatewayProvider() - assert.Nil(t, envoyGatewayProvider.Kubernetes) + assert.NotNil(t, envoyGatewayProvider.Kubernetes) assert.Equal(t, envoyGateway.Provider, envoyGatewayProvider) envoyGatewayProvider.Kubernetes = v1alpha1.DefaultEnvoyGatewayKubeProvider() diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index a5005944bbe..e82cda7787f 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -947,6 +947,11 @@ func (in *EnvoyGatewayKubernetesProvider) DeepCopyInto(out *EnvoyGatewayKubernet *out = new(bool) **out = **in } + if in.LeaderElection != nil { + in, out := &in.LeaderElection, &out.LeaderElection + *out = new(LeaderElection) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EnvoyGatewayKubernetesProvider. @@ -2522,6 +2527,41 @@ func (in *KubernetesWatchMode) DeepCopy() *KubernetesWatchMode { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LeaderElection) DeepCopyInto(out *LeaderElection) { + *out = *in + if in.LeaseDuration != nil { + in, out := &in.LeaseDuration, &out.LeaseDuration + *out = new(v1.Duration) + **out = **in + } + if in.RenewDeadline != nil { + in, out := &in.RenewDeadline, &out.RenewDeadline + *out = new(v1.Duration) + **out = **in + } + if in.RetryPeriod != nil { + in, out := &in.RetryPeriod, &out.RetryPeriod + *out = new(v1.Duration) + **out = **in + } + if in.Disable != nil { + in, out := &in.Disable, &out.Disable + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LeaderElection. +func (in *LeaderElection) DeepCopy() *LeaderElection { + if in == nil { + return nil + } + out := new(LeaderElection) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LiteralCustomTag) DeepCopyInto(out *LiteralCustomTag) { *out = *in diff --git a/internal/envoygateway/config/config.go b/internal/envoygateway/config/config.go index 4c9674a88b4..3afe0425f6c 100644 --- a/internal/envoygateway/config/config.go +++ b/internal/envoygateway/config/config.go @@ -36,6 +36,8 @@ type Server struct { DNSDomain string // Logger is the logr implementation used by Envoy Gateway. Logger logging.Logger + // Elected chan is used to signal what a leader is elected + Elected chan struct{} } // New returns a Server with default parameters. @@ -45,7 +47,8 @@ func New() (*Server, error) { Namespace: env.Lookup("ENVOY_GATEWAY_NAMESPACE", DefaultNamespace), DNSDomain: env.Lookup("KUBERNETES_CLUSTER_DOMAIN", DefaultDNSDomain), // the default logger - Logger: logging.DefaultLogger(v1alpha1.LogLevelInfo), + Logger: logging.DefaultLogger(v1alpha1.LogLevelInfo), + Elected: make(chan struct{}), }, nil } diff --git a/internal/envoygateway/config/decoder_test.go b/internal/envoygateway/config/decoder_test.go index eb2f24855db..d8c8e36a230 100644 --- a/internal/envoygateway/config/decoder_test.go +++ b/internal/envoygateway/config/decoder_test.go @@ -322,6 +322,30 @@ func TestDecode(t *testing.T) { in: inPath + "invalid-gateway-version.yaml", expect: false, }, + { + in: inPath + "gateway-leaderelection.yaml", + out: &v1alpha1.EnvoyGateway{ + TypeMeta: metav1.TypeMeta{ + Kind: v1alpha1.KindEnvoyGateway, + APIVersion: v1alpha1.GroupVersion.String(), + }, + EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{ + Gateway: v1alpha1.DefaultGateway(), + Provider: &v1alpha1.EnvoyGatewayProvider{ + Type: v1alpha1.ProviderTypeKubernetes, + Kubernetes: &v1alpha1.EnvoyGatewayKubernetesProvider{ + LeaderElection: &v1alpha1.LeaderElection{ + Disable: ptr.To(true), + LeaseDuration: ptr.To(gwapiv1.Duration("1s")), + RenewDeadline: ptr.To(gwapiv1.Duration("2s")), + RetryPeriod: ptr.To(gwapiv1.Duration("3s")), + }, + }, + }, + }, + }, + expect: true, + }, } for _, tc := range testCases { diff --git a/internal/envoygateway/config/testdata/decoder/in/gateway-global-ratelimit.yaml b/internal/envoygateway/config/testdata/decoder/in/gateway-global-ratelimit.yaml index 90feadd0491..496ff0a2152 100644 --- a/internal/envoygateway/config/testdata/decoder/in/gateway-global-ratelimit.yaml +++ b/internal/envoygateway/config/testdata/decoder/in/gateway-global-ratelimit.yaml @@ -4,6 +4,12 @@ gateway: controllerName: gateway.envoyproxy.io/gatewayclass-controller provider: type: Kubernetes + kubernetes: + leaderElection: + leaseDuration: 15s + renewDeadline: 10s + retryPeriod: 2s + disable: false rateLimit: timeout: 10ms failClosed: true diff --git a/internal/envoygateway/config/testdata/decoder/in/gateway-leaderelection.yaml b/internal/envoygateway/config/testdata/decoder/in/gateway-leaderelection.yaml new file mode 100644 index 00000000000..45b65f0fcfd --- /dev/null +++ b/internal/envoygateway/config/testdata/decoder/in/gateway-leaderelection.yaml @@ -0,0 +1,13 @@ +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: EnvoyGateway +gateway: + controllerName: gateway.envoyproxy.io/gatewayclass-controller +provider: + type: Kubernetes + kubernetes: + leaderElection: + disable: true + leaseDuration: "1s" + renewDeadline: "2s" + retryPeriod: "3s" + disabled: false diff --git a/internal/envoygateway/config/testdata/decoder/in/kube-provider.yaml b/internal/envoygateway/config/testdata/decoder/in/kube-provider.yaml index 5b57b025643..f59abe95025 100644 --- a/internal/envoygateway/config/testdata/decoder/in/kube-provider.yaml +++ b/internal/envoygateway/config/testdata/decoder/in/kube-provider.yaml @@ -2,3 +2,9 @@ apiVersion: gateway.envoyproxy.io/v1alpha1 kind: EnvoyGateway provider: type: Kubernetes + kubernetes: + leaderElection: + leaseDuration: 15s + renewDeadline: 10s + retryPeriod: 2s + disable: false diff --git a/internal/envoygateway/config/testdata/decoder/in/provider-mixing-gateway.yaml b/internal/envoygateway/config/testdata/decoder/in/provider-mixing-gateway.yaml index d5b7a239d87..339ed976c48 100644 --- a/internal/envoygateway/config/testdata/decoder/in/provider-mixing-gateway.yaml +++ b/internal/envoygateway/config/testdata/decoder/in/provider-mixing-gateway.yaml @@ -2,5 +2,11 @@ apiVersion: gateway.envoyproxy.io/v1alpha1 kind: EnvoyGateway provider: type: Kubernetes + kubernetes: + leaderElection: + leaseDuration: 15s + renewDeadline: 10s + retryPeriod: 2s + disable: false gateway: controllerName: gateway.envoyproxy.io/gatewayclass-controller diff --git a/internal/envoygateway/config/testdata/decoder/in/provider-with-gateway.yaml b/internal/envoygateway/config/testdata/decoder/in/provider-with-gateway.yaml index f16039a315e..12492974cba 100644 --- a/internal/envoygateway/config/testdata/decoder/in/provider-with-gateway.yaml +++ b/internal/envoygateway/config/testdata/decoder/in/provider-with-gateway.yaml @@ -2,5 +2,11 @@ apiVersion: gateway.envoyproxy.io/v1alpha1 kind: EnvoyGateway provider: type: Kubernetes + kubernetes: + leaderElection: + leaseDuration: 15s + renewDeadline: 10s + retryPeriod: 2s + disable: false gateway: controllerName: gateway.envoyproxy.io/gatewayclass-controller diff --git a/internal/infrastructure/runner/runner.go b/internal/infrastructure/runner/runner.go index 054f320b12a..8da18e46443 100644 --- a/internal/infrastructure/runner/runner.go +++ b/internal/infrastructure/runner/runner.go @@ -8,6 +8,8 @@ package runner import ( "context" + "k8s.io/utils/ptr" + "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/envoyproxy/gateway/internal/envoygateway/config" "github.com/envoyproxy/gateway/internal/infrastructure" @@ -41,14 +43,31 @@ func (r *Runner) Start(ctx context.Context) (err error) { r.Logger.Error(err, "failed to create new manager") return err } - go r.subscribeToProxyInfraIR(ctx) - // Enable global ratelimit if it has been configured. - if r.EnvoyGateway.RateLimit != nil { - go r.enableRateLimitInfra(ctx) + var initInfra = func() { + go r.subscribeToProxyInfraIR(ctx) + + // Enable global ratelimit if it has been configured. + if r.EnvoyGateway.RateLimit != nil { + go r.enableRateLimitInfra(ctx) + } + r.Logger.Info("started") } - r.Logger.Info("started") + // When leader election is active, infrastructure initialization occurs only upon acquiring leadership + // to avoid multiple EG instances processing envoy proxy infra resources. + if !ptr.Deref(r.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { + go func() { + select { + case <-ctx.Done(): + return + case <-r.Elected: + initInfra() + } + }() + return + } + initInfra() return } diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index 83a31795fd1..571a45c5764 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -906,6 +906,15 @@ func (r *gatewayAPIReconciler) addFinalizer(ctx context.Context, gc *gwapiv1.Gat // watchResources watches gateway api resources. func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.Manager, c controller.Controller) error { + // Upon leader election, we retrigger the reconciliation process to allow the elected leader to + // process status updates and infrastructure changes. This step is crucial for synchronizing resources + // that may have been altered or introduced while there was no elected leader. + if err := c.Watch( + NewWatchAndReconcileSource(mgr.Elected(), &gwapiv1.GatewayClass{}), + handler.EnqueueRequestsFromMapFunc(r.enqueueClass)); err != nil { + return err + } + if err := c.Watch( source.Kind(mgr.GetCache(), &gwapiv1.GatewayClass{}), handler.EnqueueRequestsFromMapFunc(r.enqueueClass), diff --git a/internal/provider/kubernetes/kubernetes.go b/internal/provider/kubernetes/kubernetes.go index 3a4bfb7c793..668555b6725 100644 --- a/internal/provider/kubernetes/kubernetes.go +++ b/internal/provider/kubernetes/kubernetes.go @@ -8,16 +8,19 @@ package kubernetes import ( "context" "fmt" + "time" "k8s.io/client-go/rest" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/envoyproxy/gateway/internal/envoygateway" - "github.com/envoyproxy/gateway/internal/envoygateway/config" + ec "github.com/envoyproxy/gateway/internal/envoygateway/config" "github.com/envoyproxy/gateway/internal/message" "github.com/envoyproxy/gateway/internal/status" ) @@ -31,14 +34,43 @@ type Provider struct { } // New creates a new Provider from the provided EnvoyGateway. -func New(cfg *rest.Config, svr *config.Server, resources *message.ProviderResources) (*Provider, error) { +func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) (*Provider, error) { // TODO: Decide which mgr opts should be exposed through envoygateway.provider.kubernetes API. + mgrOpts := manager.Options{ - Scheme: envoygateway.GetScheme(), - Logger: svr.Logger.Logger, - LeaderElection: false, - HealthProbeBindAddress: ":8081", - LeaderElectionID: "5b9825d2.gateway.envoyproxy.io", + Scheme: envoygateway.GetScheme(), + Logger: svr.Logger.Logger, + HealthProbeBindAddress: ":8081", + LeaderElectionID: "5b9825d2.gateway.envoyproxy.io", + LeaderElectionNamespace: svr.Namespace, + } + + if !ptr.Deref(svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) { + mgrOpts.LeaderElection = true + if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration != nil { + ld, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration)) + if err != nil { + return nil, err + } + mgrOpts.LeaseDuration = ptr.To(ld) + } + + if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod != nil { + rp, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod)) + if err != nil { + return nil, err + } + mgrOpts.RetryPeriod = ptr.To(rp) + } + + if svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline != nil { + rd, err := time.ParseDuration(string(*svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline)) + if err != nil { + return nil, err + } + mgrOpts.RenewDeadline = ptr.To(rd) + } + mgrOpts.Controller = config.Controller{NeedLeaderElection: ptr.To(false)} } if svr.EnvoyGateway.NamespaceMode() { @@ -47,7 +79,6 @@ func New(cfg *rest.Config, svr *config.Server, resources *message.ProviderResour mgrOpts.Cache.DefaultNamespaces[watchNS] = cache.Config{} } } - mgr, err := ctrl.NewManager(cfg, mgrOpts) if err != nil { return nil, fmt.Errorf("failed to create manager: %w", err) @@ -73,6 +104,12 @@ func New(cfg *rest.Config, svr *config.Server, resources *message.ProviderResour return nil, fmt.Errorf("unable to set up ready check: %w", err) } + // Emit elected & continue with deployment of infra resources + go func() { + <-mgr.Elected() + close(svr.Elected) + }() + return &Provider{ manager: mgr, client: mgr.GetClient(), diff --git a/internal/provider/kubernetes/kubernetes_test.go b/internal/provider/kubernetes/kubernetes_test.go index 350818cb182..2187e5b5df1 100644 --- a/internal/provider/kubernetes/kubernetes_test.go +++ b/internal/provider/kubernetes/kubernetes_test.go @@ -61,6 +61,11 @@ func TestProvider(t *testing.T) { require.NoError(t, provider.Start(ctx)) }() + testNs := config.DefaultNamespace + cli := provider.manager.GetClient() + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNs}} + require.NoError(t, cli.Create(ctx, ns)) + // Stop the kube provider. defer func() { cancel() @@ -154,15 +159,8 @@ func testGatewayClassAcceptedStatus(ctx context.Context, t *testing.T, provider func testGatewayClassWithParamRef(ctx context.Context, t *testing.T, provider *Provider, resources *message.ProviderResources) { cli := provider.manager.GetClient() - // Create the namespace for the test case. // Note: The namespace for the EnvoyProxy must match EG's configured namespace. testNs := config.DefaultNamespace - ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNs}} - require.NoError(t, cli.Create(ctx, ns)) - - defer func() { - require.NoError(t, cli.Delete(ctx, ns)) - }() epName := "test-envoy-proxy" ep := test.GetEnvoyProxy(types.NamespacedName{Namespace: testNs, Name: epName}, false) @@ -1260,8 +1258,8 @@ func TestNamespacedProvider(t *testing.T) { Type: egv1a1.KubernetesWatchModeTypeNamespaces, Namespaces: []string{"ns1", "ns2"}, }, + LeaderElection: egv1a1.DefaultLeaderElection(), } - resources := new(message.ProviderResources) provider, err := New(cliCfg, svr, resources) require.NoError(t, err) @@ -1320,8 +1318,8 @@ func TestNamespaceSelectorProvider(t *testing.T) { Type: egv1a1.KubernetesWatchModeTypeNamespaceSelector, NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"label-1": "true", "label-2": "true"}}, }, + LeaderElection: egv1a1.DefaultLeaderElection(), } - resources := new(message.ProviderResources) provider, err := New(cliCfg, svr, resources) require.NoError(t, err) @@ -1330,12 +1328,16 @@ func TestNamespaceSelectorProvider(t *testing.T) { require.NoError(t, provider.Start(ctx)) }() + testNs := config.DefaultNamespace + cli := provider.manager.GetClient() + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNs}} + require.NoError(t, cli.Create(ctx, ns)) + defer func() { cancel() require.NoError(t, testEnv.Stop()) }() - cli := provider.manager.GetClient() watchedNS := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{ Name: "watched-ns", Labels: map[string]string{"label-1": "true", "label-2": "true"}, diff --git a/internal/provider/kubernetes/sources.go b/internal/provider/kubernetes/sources.go new file mode 100644 index 00000000000..66d93acb0d5 --- /dev/null +++ b/internal/provider/kubernetes/sources.go @@ -0,0 +1,46 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package kubernetes + +import ( + "context" + "errors" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// watchAndReconcileSource is a concrete implementation of the Source interface. +type watchAndReconcileSource struct { + condition <-chan struct{} + object client.Object +} + +func NewWatchAndReconcileSource(cond <-chan struct{}, obj client.Object) source.Source { + return &watchAndReconcileSource{condition: cond, object: obj} +} + +// Start implements the Source interface. It registers the EventHandler with the Informer. +func (s *watchAndReconcileSource) Start(ctx context.Context, eh handler.EventHandler, queue workqueue.RateLimitingInterface, _ ...predicate.Predicate) error { + if s.object == nil { + return errors.New("object to queue is required") + } + // do not block controller startup + go func() { + select { + case <-ctx.Done(): + return + case <-s.condition: + // Triggers a reconcile + eh.Generic(ctx, event.GenericEvent{Object: s.object}, queue) + } + }() + return nil +} diff --git a/internal/provider/kubernetes/sources_test.go b/internal/provider/kubernetes/sources_test.go new file mode 100644 index 00000000000..adae9f8f854 --- /dev/null +++ b/internal/provider/kubernetes/sources_test.go @@ -0,0 +1,75 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package kubernetes + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" +) + +func enqueueClass(_ context.Context, _ client.Object) []reconcile.Request { + return []reconcile.Request{{NamespacedName: types.NamespacedName{ + Name: "controller-name", + }}} +} + +func TestSources(t *testing.T) { + testCases := []struct { + name string + ctx context.Context + expectedAddresses []string + handler handler.EventHandler + mapFunc handler.MapFunc + queue workqueue.RateLimitingInterface + expected bool + obj client.Object + }{ + { + name: "Queue size should increase by one after the condition event triggered", + expectedAddresses: []string{}, + handler: handler.EnqueueRequestsFromMapFunc(enqueueClass), + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + ctx: context.Background(), + obj: &gwapiv1.GatewayClass{}, + expected: true, + }, + { + name: "Confirm object is required", + expectedAddresses: []string{}, + handler: handler.EnqueueRequestsFromMapFunc(enqueueClass), + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + ctx: context.Background(), + obj: nil, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cond := make(chan struct{}) + store := NewWatchAndReconcileSource(cond, tc.obj) + err := store.Start(tc.ctx, tc.handler, tc.queue) + if !tc.expected { + require.Error(t, err) + } else { + require.NoError(t, err) + close(cond) + require.Eventually(t, func() bool { + return tc.queue.Len() == 1 + }, time.Second*3, time.Millisecond*20) + } + }) + } +} diff --git a/site/content/en/latest/api/extension_types.md b/site/content/en/latest/api/extension_types.md index 48eaf709334..c3316f67026 100644 --- a/site/content/en/latest/api/extension_types.md +++ b/site/content/en/latest/api/extension_types.md @@ -670,6 +670,7 @@ _Appears in:_ | `watch` | _[KubernetesWatchMode](#kuberneteswatchmode)_ | false | Watch holds configuration of which input resources should be watched and reconciled. | | `deploy` | _[KubernetesDeployMode](#kubernetesdeploymode)_ | false | Deploy holds configuration of how output managed resources such as the Envoy Proxy data plane
should be deployed | | `overwriteControlPlaneCerts` | _boolean_ | false | OverwriteControlPlaneCerts updates the secrets containing the control plane certs, when set. | +| `leaderElection` | _[LeaderElection](#leaderelection)_ | false | LeaderElection specifies the configuration for leader election.
If it's not set up, leader election will be active by default, using Kubernetes' standard settings. | #### EnvoyGatewayLogComponent @@ -1705,6 +1706,23 @@ _Appears in:_ +#### LeaderElection + + + +LeaderElection defines the desired leader election settings. + +_Appears in:_ +- [EnvoyGatewayKubernetesProvider](#envoygatewaykubernetesprovider) + +| Field | Type | Required | Description | +| --- | --- | --- | --- | +| `leaseDuration` | _[Duration](#duration)_ | true | LeaseDuration defines the time non-leader contenders will wait before attempting to claim leadership.
It's based on the timestamp of the last acknowledged signal. The default setting is 15 seconds. | +| `renewDeadline` | _[Duration](#duration)_ | true | RenewDeadline represents the time frame within which the current leader will attempt to renew its leadership
status before relinquishing its position. The default setting is 10 seconds. | +| `retryPeriod` | _[Duration](#duration)_ | true | RetryPeriod denotes the interval at which LeaderElector clients should perform action retries.
The default setting is 2 seconds. | +| `disable` | _boolean_ | true | Disable provides the option to turn off leader election, which is enabled by default. | + + #### LiteralCustomTag diff --git a/tools/hack/deployment-exists.sh b/tools/hack/deployment-exists.sh new file mode 100755 index 00000000000..cc50e1c2638 --- /dev/null +++ b/tools/hack/deployment-exists.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +DEPLOYMENT_LABEL_SELECTOR=$1 +DEPLOYMENT_NAMESPACE=$2 + + +# Timeout for deployment to exist (in seconds) +exist_timeout=25 +end=$((SECONDS+exist_timeout)) + +while true; do + deployment=$(kubectl get deployment -l "$DEPLOYMENT_LABEL_SELECTOR" -o name -n "$DEPLOYMENT_NAMESPACE") + if [ -n "$deployment" ]; then + echo "$deployment exists" + break + else + echo "Waiting for deployment with label selectors = \"$DEPLOYMENT_LABEL_SELECTOR\" to exists in namespace: \"$DEPLOYMENT_NAMESPACE\"" + fi + if [ $SECONDS -gt $end ]; then + echo "The timeout for waiting for a deployment to exists has passed." + exit 1 + fi + sleep 5 +done \ No newline at end of file diff --git a/tools/make/kube.mk b/tools/make/kube.mk index a862d9db20a..b37a10d1f68 100644 --- a/tools/make/kube.mk +++ b/tools/make/kube.mk @@ -113,6 +113,7 @@ install-ratelimit: kubectl rollout restart deployment envoy-gateway -n envoy-gateway-system kubectl rollout status --watch --timeout=5m -n envoy-gateway-system deployment/envoy-gateway kubectl wait --timeout=5m -n envoy-gateway-system deployment/envoy-gateway --for=condition=Available + tools/hack/deployment-exists.sh "app.kubernetes.io/name=envoy-ratelimit" "envoy-gateway-system" kubectl wait --timeout=5m -n envoy-gateway-system deployment/envoy-ratelimit --for=condition=Available .PHONY: run-e2e