Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: decouple gateway status updates from the reconciler #4767

Merged
merged 27 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8c84649
decoup gateway status update
zhaohuabing Nov 22, 2024
8c9d33e
decoup gatewayclass status update
zhaohuabing Nov 22, 2024
fc97316
fix test
zhaohuabing Nov 23, 2024
bccc70c
add comment
zhaohuabing Nov 23, 2024
7a4c51e
fix test
zhaohuabing Nov 23, 2024
e406088
fix test
zhaohuabing Nov 23, 2024
163cf5c
revert gateway api runner
zhaohuabing Nov 26, 2024
bf3d07e
update address and programming status
zhaohuabing Nov 26, 2024
0e3aec5
Merge remote-tracking branch 'origin/main' into fix-gateway-status-bl…
zhaohuabing Nov 26, 2024
c993159
Revert "update address and programming status"
zhaohuabing Nov 26, 2024
f8f2e68
avoid overriding the gateway status from Gateway API translator
zhaohuabing Dec 11, 2024
97d522a
Merge remote-tracking branch 'origin/main' into fix-gateway-status-bl…
zhaohuabing Dec 11, 2024
272348a
minor wording
zhaohuabing Dec 11, 2024
0393223
Merge remote-tracking branch 'origin/main' into fix-gateway-status-bl…
zhaohuabing Dec 11, 2024
3d20090
minor wording
zhaohuabing Dec 11, 2024
c7b7f44
only subscribe to status updates upon acquiring leadership
zhaohuabing Dec 11, 2024
be6df9a
fix lint
zhaohuabing Dec 11, 2024
5448863
Merge remote-tracking branch 'origin/main' into fix-gateway-status-bl…
zhaohuabing Dec 11, 2024
3fc0ec4
Merge branch 'main' into fix-gateway-status-blocking
zhaohuabing Dec 11, 2024
9aa5c7c
minor wording
zhaohuabing Dec 11, 2024
75dcfbd
address comment
zhaohuabing Dec 12, 2024
ba06d2b
address comment
zhaohuabing Dec 12, 2024
d77aa7d
minor wording
zhaohuabing Dec 12, 2024
99a2910
fix lint
zhaohuabing Dec 12, 2024
6ec2c6d
Merge branch 'main' into fix-gateway-status-blocking
zhaohuabing Dec 12, 2024
f36b72f
minor change
zhaohuabing Dec 12, 2024
fb00fb0
release note
zhaohuabing Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions internal/gatewayapi/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,11 @@ func (r *Runner) getAllStatuses() *StatusesToDelete {
}

func (r *Runner) deleteStatusKeys(ds *StatusesToDelete) {
for key := range ds.GatewayStatusKeys {
r.ProviderResources.GatewayStatuses.Delete(key)
delete(ds.GatewayStatusKeys, key)
}
// Do not delete the status keys for the Gateway because the Gateway status has also been stored into the ProviderResources
zhaohuabing marked this conversation as resolved.
Show resolved Hide resolved
// by the kubernetes provider to update the address and workload status.
//
// TODO: zhaohuabing this is acceptable as the Gateway status typically does not occupy a lot of memory,
// but it's better to move all the status handling to Gateway API translator layer to avoid this.
for key := range ds.HTTPRouteStatusKeys {
r.ProviderResources.HTTPRouteStatuses.Delete(key)
delete(ds.HTTPRouteStatusKeys, key)
Expand Down
2 changes: 0 additions & 2 deletions internal/gatewayapi/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ func TestDeleteStatusKeys(t *testing.T) {
delete(ds.UDPRouteStatusKeys, keys[6])
r.deleteStatusKeys(ds)

require.Equal(t, 0, r.ProviderResources.GatewayStatuses.Len())
require.Equal(t, 0, r.ProviderResources.HTTPRouteStatuses.Len())
require.Equal(t, 0, r.ProviderResources.GRPCRouteStatuses.Len())
require.Equal(t, 0, r.ProviderResources.TLSRouteStatuses.Len())
Expand Down Expand Up @@ -278,7 +277,6 @@ func TestDeleteAllStatusKeys(t *testing.T) {
// Checks that the keys are successfully stored to DeletableStatus and watchable maps
ds := r.getAllStatuses()

require.True(t, ds.GatewayStatusKeys[keys[0]])
require.True(t, ds.HTTPRouteStatusKeys[keys[1]])
require.True(t, ds.GRPCRouteStatusKeys[keys[2]])
require.True(t, ds.TLSRouteStatusKeys[keys[3]])
Expand Down
13 changes: 7 additions & 6 deletions internal/message/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ func (p *ProviderResources) Close() {

// GatewayAPIStatuses contains gateway API resources statuses
type GatewayAPIStatuses struct {
GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus]
HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus]
GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus]
TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus]
TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus]
UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus]
GatewayClassStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayClassStatus]
GatewayStatuses watchable.Map[types.NamespacedName, *gwapiv1.GatewayStatus]
HTTPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.HTTPRouteStatus]
GRPCRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1.GRPCRouteStatus]
TLSRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TLSRouteStatus]
TCPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.TCPRouteStatus]
UDPRouteStatuses watchable.Map[types.NamespacedName, *gwapiv1a2.UDPRouteStatus]
}

func (s *GatewayAPIStatuses) Close() {
Expand Down
20 changes: 12 additions & 8 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,12 @@
if managedGC.Spec.ParametersRef != nil && managedGC.DeletionTimestamp == nil {
if err := r.processGatewayClassParamsRef(ctx, managedGC, resourceMappings, gwcResource); err != nil {
msg := fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err)
if err := r.updateStatusForGatewayClass(ctx, managedGC, false, string(gwapiv1.GatewayClassReasonInvalidParameters), msg); err != nil {
r.log.Error(err, "unable to update GatewayClass status")
}
gc := status.SetGatewayClassAccepted(
managedGC.DeepCopy(),
false,
string(gwapiv1.GatewayClassReasonInvalidParameters),
msg)
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)

Check warning on line 207 in internal/provider/kubernetes/controller.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/controller.go#L202-L207

Added lines #L202 - L207 were not covered by tests
r.log.Error(err, "failed to process parametersRef for gatewayclass", "name", managedGC.Name)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -293,11 +296,12 @@

// process envoy gateway secret refs
r.processEnvoyProxySecretRef(ctx, gwcResource)

if err := r.updateStatusForGatewayClass(ctx, managedGC, true, string(gwapiv1.GatewayClassReasonAccepted), status.MsgValidGatewayClass); err != nil {
r.log.Error(err, "unable to update GatewayClass status")
return reconcile.Result{}, err
}
gc := status.SetGatewayClassAccepted(
managedGC.DeepCopy(),
true,
string(gwapiv1.GatewayClassReasonAccepted),
status.MsgValidGatewayClass)
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)

if len(gwcResource.Gateways) == 0 {
r.log.Info("No gateways found for accepted gatewayclass")
Expand Down
20 changes: 17 additions & 3 deletions internal/provider/kubernetes/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@
// Check if the Service belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
r.updateGatewayStatus(gtw)
zhaohuabing marked this conversation as resolved.
Show resolved Hide resolved
return false
}

Expand Down Expand Up @@ -528,7 +528,7 @@
// Check if the obj belongs to a Gateway, if so, update the Gateway status.
gtw := r.findOwningGateway(ctx, labels)
if gtw != nil {
r.updateStatusForGateway(ctx, gtw)
r.updateGatewayStatus(gtw)
return false
}
}
Expand Down Expand Up @@ -636,12 +636,26 @@
}

for _, gateway := range gateways.Items {
r.updateStatusForGateway(ctx, &gateway)
r.updateGatewayStatus(&gateway)

Check warning on line 639 in internal/provider/kubernetes/predicates.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/predicates.go#L639

Added line #L639 was not covered by tests
}

return nil
}

// updateGatewayStatus triggers a status update for the Gateway.
func (r *gatewayAPIReconciler) updateGatewayStatus(gateway *gwapiv1.Gateway) {
// The status added to GatewayStatuses is solely used to trigger the status updater
// and does not reflect the real changed status.
//
// The status updater will check the Envoy Proxy service to get the addresses of the Gateway,
// and check the Envoy Proxy Deployment/DaemonSet to get the status of the Gateway workload.
//
// Since the status does not reflect the actual changed status, we need to delete it first
// to prevent it from being considered unchanged. This ensures that subscribers receive the update event.
r.resources.GatewayStatuses.Delete(utils.NamespacedName(gateway))
zhaohuabing marked this conversation as resolved.
Show resolved Hide resolved
zhaohuabing marked this conversation as resolved.
Show resolved Hide resolved
r.resources.GatewayStatuses.Store(utils.NamespacedName(gateway), &gateway.Status)
}

func (r *gatewayAPIReconciler) handleNode(obj client.Object) bool {
ctx := context.Background()
node, ok := obj.(*corev1.Node)
Expand Down
3 changes: 3 additions & 0 deletions internal/provider/kubernetes/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/envoyproxy/gateway/internal/gatewayapi/resource"
"github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/proxy"
"github.com/envoyproxy/gateway/internal/logging"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/provider/kubernetes/test"
)

Expand Down Expand Up @@ -854,6 +855,7 @@ func TestValidateServiceForReconcile(t *testing.T) {
classController: egv1a1.GatewayControllerName,
log: logger,
mergeGateways: sets.New[string]("test-mg"),
resources: &message.ProviderResources{},
grpcRouteCRDExists: true,
tcpRouteCRDExists: true,
udpRouteCRDExists: true,
Expand Down Expand Up @@ -972,6 +974,7 @@ func TestValidateObjectForReconcile(t *testing.T) {
classController: egv1a1.GatewayControllerName,
log: logger,
mergeGateways: sets.New[string]("test-mg"),
resources: &message.ProviderResources{},
}

for _, tc := range testCases {
Expand Down
61 changes: 29 additions & 32 deletions internal/provider/kubernetes/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"fmt"
"reflect"

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -28,6 +27,35 @@
// subscribeAndUpdateStatus subscribes to gateway API object status updates and
// writes it into the Kubernetes API Server.
func (r *gatewayAPIReconciler) subscribeAndUpdateStatus(ctx context.Context, extensionManagerEnabled bool) {
// GatewayClass object status updater
go func() {
message.HandleSubscription(
message.Metadata{Runner: string(egv1a1.LogComponentProviderRunner), Message: "gatewayclass-status"},
r.resources.GatewayClassStatuses.Subscribe(ctx),
func(update message.Update[types.NamespacedName, *gwapiv1.GatewayClassStatus], errChan chan error) {
// skip delete updates.
if update.Delete {
return
}

Check warning on line 39 in internal/provider/kubernetes/status.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/status.go#L38-L39

Added lines #L38 - L39 were not covered by tests

r.statusUpdater.Send(Update{
NamespacedName: update.Key,
Resource: new(gwapiv1.GatewayClass),
Mutator: MutatorFunc(func(obj client.Object) client.Object {
gc, ok := obj.(*gwapiv1.GatewayClass)
if !ok {
panic(fmt.Sprintf("unsupported object type %T", obj))

Check warning on line 47 in internal/provider/kubernetes/status.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/status.go#L47

Added line #L47 was not covered by tests
}
gcCopy := gc.DeepCopy()
gcCopy.Status = *update.Value
return gcCopy
}),
})
},
)
r.log.Info("gatewayclass status subscriber shutting down")

Check warning on line 56 in internal/provider/kubernetes/status.go

View check run for this annotation

Codecov / codecov/patch

internal/provider/kubernetes/status.go#L56

Added line #L56 was not covered by tests
}()

// Gateway object status updater
go func() {
message.HandleSubscription(
Expand Down Expand Up @@ -564,34 +592,3 @@
}),
})
}

zhaohuabing marked this conversation as resolved.
Show resolved Hide resolved
func (r *gatewayAPIReconciler) updateStatusForGatewayClass(
ctx context.Context,
gc *gwapiv1.GatewayClass,
accepted bool,
reason,
msg string,
) error {
if r.statusUpdater != nil {
r.statusUpdater.Send(Update{
NamespacedName: types.NamespacedName{Name: gc.Name},
Resource: &gwapiv1.GatewayClass{},
Mutator: MutatorFunc(func(obj client.Object) client.Object {
gc, ok := obj.(*gwapiv1.GatewayClass)
if !ok {
panic(fmt.Sprintf("unsupported object type %T", obj))
}

return status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg)
}),
})
} else {
// this branch makes testing easier by not going through the status.Updater.
duplicate := status.SetGatewayClassAccepted(gc.DeepCopy(), accepted, reason, msg)

if err := r.client.Status().Update(ctx, duplicate); err != nil && !kerrors.IsNotFound(err) {
return fmt.Errorf("error updating status of gatewayclass %s: %w", duplicate.Name, err)
}
}
return nil
}