Skip to content

Commit

Permalink
Don't send endpoint profile updates from Server updates when opaquene…
Browse files Browse the repository at this point in the history
…ss doesn't change (#12013)

When the destination controller receives an update for a Server resource, we recompute opaqueness ports for all pods.  This results in a large number of updates to all endpoint profile watches, even if the opaqueness doesn't change.  In cases where there are many Server resources, this can result in a large number of updates being sent to the endpoint profile translator and overflowing the endpoint profile translator update queue.  This is especially likely to happen during an informer resync, since this will result in an informer callback for every Server in the cluster.

We refactor the workload watcher to not send these updates if the opaqueness has not changed.

This, seemingly simple, change in behavior requires a large code change because:
* the current opaqueness state is not stored on workload publishers and must be added so that we can determine if the opaqueness has changed
* storing the opaqueness in addition to the other state we're storing (pod, ip, port, etc.) means that we are not storing all of the data represented by the Address struct
* workload watcher uses a `createAddress` func to dynamically create an Address from the state it stores
* now that we are storing the Address as state, creating Addresses dynamically is no longer necessary and we can operate on the Address state directly
  * this makes the workload watcher more similar to other watchers and follow a common pattern
  * it also fixes some minor correctness issues:
    * pods that did not have the ready status condition were being considered when they should not have been
    * updates to ExternalWorkload labels were not being considered

Signed-off-by: Alex Leong <alex@buoyant.io>
  • Loading branch information
adleong authored Mar 19, 2024
1 parent a4c19b8 commit 5915ef5
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 189 deletions.
3 changes: 2 additions & 1 deletion controller/api/destination/endpoint_profile_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ func (ept *endpointProfileTranslator) update(address *watcher.Address) {
return
}

_, opaqueProtocol := opaquePorts[address.Port]
profile := &pb.DestinationProfile{
RetryBudget: defaultRetryBudget(),
Endpoint: endpoint,
OpaqueProtocol: address.OpaqueProtocol,
OpaqueProtocol: opaqueProtocol || address.OpaqueProtocol,
}
if proto.Equal(profile, ept.current) {
ept.log.Debugf("Ignoring redundant profile update: %+v", profile)
Expand Down
26 changes: 25 additions & 1 deletion controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ metadata:
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.0.12
podIPs:
- ip: 172.17.0.12
Expand Down Expand Up @@ -146,6 +149,9 @@ metadata:
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.0.13
podIPs:
- ip: 172.17.0.13`
Expand Down Expand Up @@ -193,6 +199,9 @@ metadata:
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.0.14
podIPs:
- ip: 172.17.0.14
Expand Down Expand Up @@ -258,6 +267,9 @@ metadata:
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.0.15
podIPs:
- ip: 172.17.0.15
Expand Down Expand Up @@ -311,6 +323,9 @@ metadata:
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.13.15
podIPs:
- ip: 172.17.13.15`,
Expand Down Expand Up @@ -358,6 +373,9 @@ metadata:
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.0.16
podIPs:
- ip: 172.17.0.16
Expand Down Expand Up @@ -408,6 +426,9 @@ metadata:
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
hostIP: 192.168.1.20
podIP: 172.17.0.17
podIPs:
Expand Down Expand Up @@ -466,6 +487,9 @@ metadata:
namespace: ns
status:
phase: Running
conditions:
- type: Ready
status: "True"
podIP: 172.17.55.1
podIPs:
- ip: 172.17.55.1
Expand Down Expand Up @@ -528,7 +552,7 @@ spec:
port: 4143
status:
conditions:
ready: true`,
- ready: true`,
`
apiVersion: workload.linkerd.io/v1beta1
kind: ExternalWorkload
Expand Down
18 changes: 9 additions & 9 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A
pp.log.Errorf("Unable to create new address:%v", err)
continue
}
err = SetToServerProtocol(pp.k8sAPI, &address, resolvedPort)
err = SetToServerProtocol(pp.k8sAPI, &address)
if err != nil {
pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
continue
Expand All @@ -955,7 +955,7 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A
continue
}

err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address, resolvedPort)
err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address)
if err != nil {
pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
continue
Expand Down Expand Up @@ -1067,7 +1067,7 @@ func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) Addre
pp.log.Errorf("Unable to create new address:%v", err)
continue
}
err = SetToServerProtocol(pp.k8sAPI, &address, resolvedPort)
err = SetToServerProtocol(pp.k8sAPI, &address)
if err != nil {
pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
continue
Expand Down Expand Up @@ -1487,7 +1487,7 @@ func isValidSlice(es *discovery.EndpointSlice) bool {

// SetToServerProtocol sets the address's OpaqueProtocol field based off any
// Servers that select it and override the expected protocol.
func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error {
func SetToServerProtocol(k8sAPI *k8s.API, address *Address) error {
if address.Pod == nil {
return fmt.Errorf("endpoint not backed by Pod: %s:%d", address.IP, address.Port)
}
Expand All @@ -1504,13 +1504,13 @@ func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error {
var portMatch bool
switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(port) {
if server.Spec.Port.IntVal == int32(address.Port) {
portMatch = true
}
case intstr.String:
for _, c := range address.Pod.Spec.Containers {
for _, p := range c.Ports {
if (p.ContainerPort == int32(port) || p.HostPort == int32(port)) &&
if (p.ContainerPort == int32(address.Port) || p.HostPort == int32(address.Port)) &&
p.Name == server.Spec.Port.StrVal {
portMatch = true
}
Expand All @@ -1530,7 +1530,7 @@ func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error {

// setToServerProtocolExternalWorkload sets the address's OpaqueProtocol field based off any
// Servers that select it and override the expected protocol for ExternalWorkloads.
func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address, port Port) error {
func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address) error {
if address.ExternalWorkload == nil {
return fmt.Errorf("endpoint not backed by ExternalWorkload: %s:%d", address.IP, address.Port)
}
Expand All @@ -1547,12 +1547,12 @@ func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address, port
var portMatch bool
switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(port) {
if server.Spec.Port.IntVal == int32(address.Port) {
portMatch = true
}
case intstr.String:
for _, p := range address.ExternalWorkload.Spec.Ports {
if p.Port == int32(port) && p.Name == server.Spec.Port.StrVal {
if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
portMatch = true
}

Expand Down
Loading

0 comments on commit 5915ef5

Please sign in to comment.